Skip to content

Instantly share code, notes, and snippets.

@9b
Last active July 31, 2021 05:17
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 9b/11379392 to your computer and use it in GitHub Desktop.
Save 9b/11379392 to your computer and use it in GitHub Desktop.
from passivetotal import PassiveTotal
api = 'your key'
pt = PassiveTotal(api)
pt.setLogging('DEBUG')
print pt.search('www.trendmicro-update.org')
print pt.classify('www.trendmicro-update.org', 'targeted')
print pt.tag('www.trendmicro-update.org', 'china', 'add')
import requests, json, logging, sys
class PassiveTotal:
def __init__(self, apikey):
self.__apikey = apikey
self.__classifications = [ 'targeted', 'crime', 'benign', 'multiple' ]
self.__actions = [ 'add', 'remove' ]
self.__endpoint = 'https://www.passivetotal.org/api/'
self.__logger = logging.getLogger('PassiveTotal')
def setLogging(self, level):
logger = logging.getLogger('PassiveTotal')
if level == "INFO":
logger.setLevel(logging.INFO)
elif level == "WARN":
logger.setLevel(logging.WARN)
elif level == "DEBUG":
logger.setLevel(logging.DEBUG)
elif level == "ERROR":
logger.setLevel(logging.ERROR)
else:
pass
format = logging.Formatter('\033[1;32m%(levelname)-5s %(module)s:%(funcName)s():%(lineno)d %(asctime)s\033[0m| %(message)s')
shandler = logging.StreamHandler(sys.stdout)
shandler.setFormatter(format)
logger.addHandler(shandler)
return logger
def classify(self, value, classification):
url = self.__endpoint + 'classify'
if classification.lower() not in self.__classifications:
raise Exception("%s is not a valid classification type. Use %s." % ( classification, str(self.__classifications) ) )
params = { 'apikey': self.__apikey, 'classification': classification.lower(), 'value': value }
response = requests.post(url, params=params)
self.__logger.debug("Response %d: %s %s" % (response.status_code, url, str(params)))
if response.status_code == 200:
return json.loads(response.content)
else:
self.__logger.error('Query failed: %s' % response.content)
raise Exception('Query failed')
def tag(self, value, tag, action):
if action.lower() not in self.__actions:
raise Exception("%s is not a valid tag action. Use %s." % ( action, str(self.__actions) ) )
if action.lower() == 'add':
url = self.__endpoint + 'tag/add'
else:
url = self.__endpoint + 'tag/remove'
params = { 'apikey': self.__apikey, 'tag': tag, 'value': value }
response = requests.post(url, params=params)
self.__logger.debug("Response %d: %s %s" % (response.status_code, url, str(params)))
if response.status_code == 200:
return json.loads(response.content)
else:
self.__logger.error('Query failed: %s' % response.content)
raise Exception('Query failed')
def search(self, value):
url = self.__endpoint + 'passive'
params = { 'apikey': self.__apikey, 'value': value }
response = requests.post(url, params=params)
self.__logger.debug("Response %d: %s %s" % (response.status_code, url, str(params)))
if response.status_code == 200:
return json.loads(response.content)
else:
self.__logger.error('Query failed: %s' % response.content)
raise Exception('Query failed')
"""PassiveTotal CLI.
Usage:
ptcli.py query <indicator> [--raw]
ptcli.py classify <indicator> (targeted|crime|multiple|benign) [--bulk]
ptcli.py (add|remove) tag <indicator> <tag> [--bulk]
ptcli.py (-h | --help)
ptcli.py --version
Options:
-h --help Show this screen.
--version Show version.
--raw Dump the results in raw JSON.
--bulk Read values from a file instead of the CLI.
"""
from docopt import docopt
from passivetotal import PassiveTotal
import os, sys
API = 'YOUR KEY'
if __name__ == '__main__':
arguments = docopt(__doc__, version='PassiveTotal 1.0')
pt = PassiveTotal(API)
if arguments['query']:
response = pt.search(arguments['<indicator>'])
if response['success']:
if arguments['--raw']:
print response
else:
response = response['results']
print "[*] Query:", response['focus']
print "[*] First Seen:", response['firstSeen']
print "[*] Last Seen:", response['lastSeen']
print "[*] Resolve Count: ", response['rawCount']
print "[*] Resolutions"
for resolve in response['resolutions']:
print "=>", resolve['resolve'], resolve['firstSeen'], resolve['lastSeen'], [ str(x) for x in resolve['source'] ]
else:
print response['error']
if arguments['classify']:
if not arguments['--bulk']:
if arguments['targeted']:
response = pt.classify(arguments['<indicator>'], 'targeted')
elif arguments['crime']:
response = pt.classify(arguments['<indicator>'], 'crime')
elif arguments['multiple']:
response = pt.classify(arguments['<indicator>'], 'multiple')
else:
response = pt.classify(arguments['<indicator>'], 'benign')
if response['success']:
print response['message']
else:
print response['error']
else:
if os.path.exists(arguments['<indicator>']):
f = open(arguments['<indicator>'], 'r')
items = [ x.strip() for x in f.readlines() ]
f.close()
for item in items:
if arguments['targeted']:
response = pt.classify(item, 'targeted')
elif arguments['crime']:
response = pt.classify(item, 'crime')
elif arguments['multiple']:
response = pt.classify(item, 'multiple')
else:
response = pt.classify(item, 'benign')
if response['success']:
print item, response['message']
else:
print item, response['error']
if arguments['tag']:
if not arguments['--bulk']:
if arguments['add']:
response = pt.tag(arguments['<indicator>'], arguments['<tag>'], 'add')
else:
response = pt.tag(arguments['<indicator>'], arguments['<tag>'], 'remove')
if response['success']:
print response['message']
else:
print response['error']
else:
if os.path.exists(arguments['<indicator>']):
f = open(arguments['<indicator>'], 'r')
items = [ x.strip() for x in f.readlines() ]
f.close()
for item in items:
if arguments['add']:
response = pt.tag(item, arguments['<tag>'], 'add')
else:
response = pt.tag(item, arguments['<tag>'], 'remove')
if response['success']:
print item, response['message']
else:
print item, response['error']
@krmaxwell
Copy link

If you put this in a proper repository with licensing information, I'd be eternally grateful. :)

@asieira
Copy link

asieira commented Nov 20, 2014

+1

@asieira
Copy link

asieira commented Nov 20, 2014

Also, would you consider submitting this to pypi so I can install with pip and do proper version management?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment