Skip to content

Instantly share code, notes, and snippets.

@ryanlovett
Last active March 11, 2016 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryanlovett/b85745e994091ed8a566 to your computer and use it in GitHub Desktop.
Save ryanlovett/b85745e994091ed8a566 to your computer and use it in GitHub Desktop.
A script that downloads SNS AID data from Berkeley's ISP
#!/usr/bin/env python3
'''
Download and cache SNS AID list.
API filter examples:
# last_seen_ts (returns entries where last_seen_ts >= '2015-04-09'):
# Note: The time stamp can be made even more precise using the format of
# <Date>T<hour>:<min>:<sec><TZ offset (optiona l)>
?last_seen_ts=2015-04-09
# single service
?service=ssh
# multiple services
?service=ssh,mysql
# time and service:
?last_seen_ts=2015-06-25T16:00:00-7:00&service=mysql,ssh'
'''
import os
import sys
import time
import json
from optparse import OptionParser
from requests.packages.urllib3.util import Retry
from requests.adapters import HTTPAdapter
from requests import Session, exceptions
API_URL = "https://api.security.berkeley.edu/api/aggressive_ips"
MAX_RETRIES = 5
RETRY_STATUS_LIST = [502] # We see this from time to time
def validate_cache(cache_file, expire):
'''Check that the cache exists, is fresh, and is valid JSON.'''
if not os.path.exists(cache_file): raise Exception
# if the current file age + expiration window is less than now,
# the cache is stale.
if os.stat(cache_file).st_mtime + expire < time.mktime(time.localtime()):
raise Exception
json.loads(open(cache_file, 'rb').read().decode('utf-8'))
def download_data(url, key):
headers = { "Authorization":"Token token=%s" % key }
max_retries=Retry(total=MAX_RETRIES, status_forcelist=RETRY_STATUS_LIST)
s = Session()
s.mount("https://", HTTPAdapter(max_retries=max_retries))
r = s.get(url=url, headers=headers)
if r.status_code == 200:
return r.json()
r.raise_for_status()
def write_cache(cache_file, data):
s = json.dumps(data, separators=(',', ': '), indent=4)
open(cache_file, 'wb').write(s.encode('utf-8'))
if __name__ == "__main__":
parser = OptionParser()
parser.add_option("-u", dest="url", default=API_URL, help="API URL")
parser.add_option("-k", dest="key", help="API key")
parser.add_option("-f", dest="cache", default='/var/cache/sns-aid.json',
help="sns aid cache file")
parser.add_option("-e", dest="expire", type='int', default=86400,
help="cache expiration time (s)")
(options, args) = parser.parse_args()
if not options.key:
if 'SNS_AID_KEY' in os.environ:
key = os.environ['SNS_AID_KEY']
else:
print('No API key defined.')
sys.exit(1)
else:
key = options.key
# Exit if the cache is valid
try: validate_cache(options.cache, options.expire)
except: pass
else: sys.exit(0)
data = download_data(options.url, key)
write_cache(options.cache, data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment