Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Automate BePush indicator processing based on PassiveTotal monitor notifications.
#!/usr/bin/env python
"""PassiveTotal script to automate BePush processing based on monitors.
This script will query for the items in your account monitor list and use those
values in order to get all passive DNS notifications. Each notification will be
processed which includes tagging, classifying and sharing out to Facebook's
ThreatExchange. Running this script daily will essentially automate most of the
work required to keep up with BePush.
__author__ = 'Brandon Dixon ('
__version__ = '1.0.0'
__description__ = ""
__keywords__ = ['crimeware', 'facebook', 'bepush']
__requirements__ = ['pytx', 'passivetotal', 'requests']
import datetime
import logging
import requests
import time
from IPy import IP
from passivetotal.libs.actions import ActionsClient
from passivetotal.libs.account import AccountClient
from pytx import ThreatDescriptor
from pytx.access_token import access_token
from pytx.vocabulary import PrivacyType
from pytx.vocabulary import ShareLevel
from pytx.vocabulary import ThreatExchange as te
from inspect import getframeinfo, stack
logger = logging.getLogger('automator')
TX_APP_ID = ""
TX_HEADERS = {'User-Agent': 'PassiveTotalIntegration/v1.0'}
def watchdog(function):
"""Decorator with full function introspection capabilities."""
def watchdog_bark(*args, **kwargs):
caller = getframeinfo(stack()[1][0])
fname = caller.filename.split('/')[-1]
logger.debug("before %s:%d %s" % (
fname, caller.lineno, function.__name__))
start = time.time()
return_value = function(*args, **kwargs)
logger.debug("after %s:%d %s (ttc: %f)" % (
fname, caller.lineno, function.__name__, time.time() - start))
return return_value
return watchdog_bark
def offset_time_past(delta):
"""Return the current datetime as a string."""
return ( - datetime.timedelta(days=delta))
def value_type(value):
"""Value type identifies if the passed value is a domain or IP address."""
return 'IP_ADDRESS'
return 'DOMAIN'
def get_descriptor_struct(query_value):
"""Get a descriptor stub to fill in with any dynamic data."""
descriptor = dict()
descriptor['privacy_type'] = PrivacyType.VISIBLE
descriptor['share_level'] = ShareLevel.WHITE
descriptor['indicator'] = query_value
descriptor['type'] = value_type(query_value)
if descriptor['type'] == "DOMAIN":
descriptor['status'] = "MALICIOUS"
descriptor['status'] = "SUSPICIOUS"
descriptor['metadata'] = "CRIMEWARE"
descriptor['severity'] = "SEVERE"
return descriptor
def get_facebook_id(query_value):
"""Get the FBID of the item matching the query value."""
access_token(TX_APP_ID, TX_APP_SECRET)
results = ThreatDescriptor.objects(text=query_value, strict_text=False,
owner=TX_APP_ID, headers=TX_HEADERS)
fb_raw = None
for result in results:
tmp = result.to_dict()
indicator = tmp.get('indicator', dict())
indicator = indicator.get('indicator', '')
if indicator.find(query_value) == -1:
fb_raw = tmp
logger.debug("FB-Response: %s" % str(fb_raw))
return fb_raw
def tx_post(notification):
"""Post the indicators in question to Facebook ThreatExchange."""
access_token(TX_APP_ID, TX_APP_SECRET)
query_value = notification.get('result', None)
tmp = get_descriptor_struct(query_value)
focus = notification.get('query').strip(".")
adjusted = notification.get('datetime')[:10]
tmp['description'] = "Infrastructure related to %s based on passive DNS. "\
"Discovered on %s using PassiveTotal monitors." % \
(focus, adjusted)
reference = get_facebook_id(query_value)
logger.debug("POSTing: %s" % str(tmp))
if not reference:
logger.debug("Inserting new record"), headers=TX_HEADERS)
logger.debug("Updating existing record")
tmp['id'] = reference.get('id', '')
url = te.URL + str(tmp['id'])
access_token_tx = "%s|%s" % (TX_APP_ID, TX_APP_SECRET)
url = "%s?access_token=%s" % (url, access_token_tx)
response =, data=tmp, headers=TX_HEADERS, timeout=7)
if response.status_code not in (200, 299):
return False
return True
def group_post():
"""Post a digest of activity to the Facebook group."""
raise NotImplemented
def main():
"""Run the main program."""
pt = AccountClient(PT_USERNAME, PT_API_KEY)
actions = ActionsClient(PT_USERNAME, PT_API_KEY)
monitors = pt.get_account_monitors()
for monitor in monitors.get('monitors', list()):
if "bepush" not in monitor.get('tags', list()):
query_value = str(monitor.get('focus', ''))
notifications = pt.get_account_notifications(query=query_value)
logger.debug("%s notifications: %s" % (query_value, str(notifications)))
current_range = offset_time_past(2) # Step back two days
for notification in notifications.get('notifications', list()):
if notification.get('dataset', '') != 'passive-dns':
short_date = notification.get('datetime')[:10]
compare_date = datetime.datetime.strptime(short_date, "%Y-%m-%d")
if compare_date < current_range:
query_value = notification.get('result', None)
if not query_value:
notification['result'] = query_value.strip(".")
tx_post_status = tx_post(notification)
tags=['bepush', 'crimeware'])
if value_type(query_value) == "DOMAIN":
classification = "malicious"
classification = "suspicious"
# group_post_status = group_post(notification)
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.