Skip to content

Instantly share code, notes, and snippets.

@benjschmitt
Created June 13, 2016 22:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save benjschmitt/8c25bd7cf9dbc80de23208987e960ecc to your computer and use it in GitHub Desktop.
Save benjschmitt/8c25bd7cf9dbc80de23208987e960ecc to your computer and use it in GitHub Desktop.
example of OpenDNS/S3 file access/review
#!/usr/bin/env python
# quick script for demonstrating openDNS log access via S3
# TODO: dates (ranges) to be added as input; hash values from S3 and from filesystem useful for
# investigative support
# bschmitt - 20160605
import re
import os
import gzip
import boto
import sys
import operator
# global vars/const
LOCAL_PATH = '' # full filesystem path to base download directory
MY_BUCKET = '' # simple bucket name
regex_ipv4 = re.compile("\(A\)")
regex_ipv6 = re.compile("\(AAAA\)")
dump = list()
def eval_dir_path(full_path):
""" get directory path - validate or create
:param full_path: filesystem path to target directory
:rtype : void
"""
tmp = full_path.split('/')
reduced = ""
for x in tmp[:-1]:
reduced = reduced + '/' + x
reduced = reduced[1:]
if not os.path.isdir(reduced):
os.makedirs(reduced)
def extract_zip_stdout(dirpath):
""" prepare filesystem path to each .gz file requiring processing
:param dirpath: filesystem path to target directory
:rtype : full path to today's date - openDNS and the S3 bucket use the %Y-%m-%d format
"""
path = dirpath
dirs = os.listdir(path)
for f in dirs:
fullpath = path + "/" + f
extract_zip(fullpath)
def extract_zip(input_zip):
""" stream .gz file to dump list for further processing
:param input_zip: full file system path (local) to .gz file
"""
with gzip.open(input_zip, 'r') as fin:
for line in fin:
dump.append(line)
def url(target):
""" search for lines by URL passed
:param target: string for regex generation and comparison
:rtype : lines of regex as STDOUT
"""
print "openDNS url query: " + target
regex = re.compile(target)
download()
prefix = get_prefix()
extract_zip_stdout(LOCAL_PATH + prefix)
# get pertinent information from lines
for i in dump:
entry_elements = i.split(",")
target_url = entry_elements[8]
result_url = regex.search(target_url)
# check if URL was hit and if so, print
if result_url:
assert isinstance(i, object)
print i.__str__().rstrip()
def stats():
""" examples of stats which can be gleaned from the S3 logs
TODO - other stats to consider such as: total requests, top domains, additional request types (either DNS requests
or security catagories (botnet, phishing, etc.)
:rtype : lines of stats as STDOUT
"""
print "openDNS stats"
download()
prefix = get_prefix()
extract_zip_stdout(LOCAL_PATH + prefix)
# get pertinent information from lines
allowed = 0
blocked = 0
ipv4 = 0
ipv6 = 0
blockedsites = list()
badstuff = dict()
for i in dump:
entry_elements = i.split(",")
action = entry_elements[5]
# print "action: " + action
if action.startswith('\"Allo'):
allowed += 1
else:
blocked += 1
blockedsites.append(entry_elements[8])
# add these elements to dict key => list
if entry_elements[8] in badstuff:
badstuff[entry_elements[8]] += 1
else:
badstuff[entry_elements[8]] = 1
# get IP version information
result_ipv4 = regex_ipv4.search(entry_elements[6])
result_ipv6 = regex_ipv6.search(entry_elements[6])
if result_ipv4:
ipv4 += 1
if result_ipv6:
ipv6 += 1
print "allowed: " + str(allowed)
print "blocked: " + str(blocked)
print "IPv4: " + str(ipv4)
print "IPv6: " + str(ipv6) + "\n"
sorted_bad = sorted(badstuff.items(), key=operator.itemgetter(1))
for i in sorted_bad:
print i
def get_date():
""" get today's date
:rtype : today_date
"""
from datetime import datetime
i = datetime.now()
today_date = i.strftime('%Y-%m-%d')
return today_date
def get_prefix():
""" get prefix for local directory to match S3 bucket path
:rtype : complete_prefix
"""
today = get_date()
complete_prefix = 'dnslogs/' + today
return complete_prefix
def help_me():
""" print help
:rtype : void
"""
print "Help - s3query.py"
print "Ensure computer has network connectivity, python 2.7 or greater\
and AWS API key/secret available for S3 read operations\n"
def usage():
""" print usage
:rtype : void
"""
print "Usage information - no args passed"
print 'Usage: ' + sys.argv[0] + ' -h - prints help_me'
print 'Usage: ' + sys.argv[0] + ' -s - daily statistics'
print 'Usage: ' + sys.argv[0] + ' -d - download today\'s logs'
print 'Usage: ' + sys.argv[0] + ' -u - URL or domain query'
def download():
""" download today's logs to local filesystem. TODO: take date as param, provide feedback
:rtype : void - files dropped on filesystem
"""
print "downloading todays logs: " + get_date()
# list objects in a bucket
conn = boto.connect_s3()
try:
buck = conn.get_bucket(MY_BUCKET)
prefix = get_prefix()
bucklist = buck.list(prefix)
for l in bucklist:
# get today's files
key_string = str(l.key)
eval_dir_path(LOCAL_PATH + key_string)
if not os.path.exists(LOCAL_PATH + key_string):
l.get_contents_to_filename(LOCAL_PATH + key_string)
except IndexError, i:
print 'IndexError - "&s"' % str(i)
except boto.exception.S3ResponseError, i:
print "Check your S3 permissions - " + str(i)
def main():
# Get the total number of args passed
total = len(sys.argv)
if total == 1:
usage()
sys.exit()
if total > 1:
# get args and do stuff
if (str(sys.argv[1])).startswith('-h'):
help_me()
if (str(sys.argv[1])).startswith('-s'):
stats()
if (str(sys.argv[1])).startswith('-d'):
download()
if (str(sys.argv[1])).startswith('-u'):
url(str(sys.argv[2]))
sys.exit()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment