example of OpenDNS/S3 file access/review
#!/usr/bin/env python | |
# quick script for demonstrating openDNS log access via S3 | |
# TODO: dates (ranges) to be added as input; hash values from S3 and from filesystem useful for | |
# investigative support | |
# bschmitt - 20160605 | |
import re | |
import os | |
import gzip | |
import boto | |
import sys | |
import operator | |
# global vars/const | |
LOCAL_PATH = '' # full filesystem path to base download directory | |
MY_BUCKET = '' # simple bucket name | |
regex_ipv4 = re.compile("\(A\)") | |
regex_ipv6 = re.compile("\(AAAA\)") | |
dump = list() | |
def eval_dir_path(full_path): | |
""" get directory path - validate or create | |
:param full_path: filesystem path to target directory | |
:rtype : void | |
""" | |
tmp = full_path.split('/') | |
reduced = "" | |
for x in tmp[:-1]: | |
reduced = reduced + '/' + x | |
reduced = reduced[1:] | |
if not os.path.isdir(reduced): | |
os.makedirs(reduced) | |
def extract_zip_stdout(dirpath): | |
""" prepare filesystem path to each .gz file requiring processing | |
:param dirpath: filesystem path to target directory | |
:rtype : full path to today's date - openDNS and the S3 bucket use the %Y-%m-%d format | |
""" | |
path = dirpath | |
dirs = os.listdir(path) | |
for f in dirs: | |
fullpath = path + "/" + f | |
extract_zip(fullpath) | |
def extract_zip(input_zip): | |
""" stream .gz file to dump list for further processing | |
:param input_zip: full file system path (local) to .gz file | |
""" | |
with gzip.open(input_zip, 'r') as fin: | |
for line in fin: | |
dump.append(line) | |
def url(target): | |
""" search for lines by URL passed | |
:param target: string for regex generation and comparison | |
:rtype : lines of regex as STDOUT | |
""" | |
print "openDNS url query: " + target | |
regex = re.compile(target) | |
download() | |
prefix = get_prefix() | |
extract_zip_stdout(LOCAL_PATH + prefix) | |
# get pertinent information from lines | |
for i in dump: | |
entry_elements = i.split(",") | |
target_url = entry_elements[8] | |
result_url = regex.search(target_url) | |
# check if URL was hit and if so, print | |
if result_url: | |
assert isinstance(i, object) | |
print i.__str__().rstrip() | |
def stats(): | |
""" examples of stats which can be gleaned from the S3 logs | |
TODO - other stats to consider such as: total requests, top domains, additional request types (either DNS requests | |
or security catagories (botnet, phishing, etc.) | |
:rtype : lines of stats as STDOUT | |
""" | |
print "openDNS stats" | |
download() | |
prefix = get_prefix() | |
extract_zip_stdout(LOCAL_PATH + prefix) | |
# get pertinent information from lines | |
allowed = 0 | |
blocked = 0 | |
ipv4 = 0 | |
ipv6 = 0 | |
blockedsites = list() | |
badstuff = dict() | |
for i in dump: | |
entry_elements = i.split(",") | |
action = entry_elements[5] | |
# print "action: " + action | |
if action.startswith('\"Allo'): | |
allowed += 1 | |
else: | |
blocked += 1 | |
blockedsites.append(entry_elements[8]) | |
# add these elements to dict key => list | |
if entry_elements[8] in badstuff: | |
badstuff[entry_elements[8]] += 1 | |
else: | |
badstuff[entry_elements[8]] = 1 | |
# get IP version information | |
result_ipv4 = regex_ipv4.search(entry_elements[6]) | |
result_ipv6 = regex_ipv6.search(entry_elements[6]) | |
if result_ipv4: | |
ipv4 += 1 | |
if result_ipv6: | |
ipv6 += 1 | |
print "allowed: " + str(allowed) | |
print "blocked: " + str(blocked) | |
print "IPv4: " + str(ipv4) | |
print "IPv6: " + str(ipv6) + "\n" | |
sorted_bad = sorted(badstuff.items(), key=operator.itemgetter(1)) | |
for i in sorted_bad: | |
print i | |
def get_date(): | |
""" get today's date | |
:rtype : today_date | |
""" | |
from datetime import datetime | |
i = datetime.now() | |
today_date = i.strftime('%Y-%m-%d') | |
return today_date | |
def get_prefix(): | |
""" get prefix for local directory to match S3 bucket path | |
:rtype : complete_prefix | |
""" | |
today = get_date() | |
complete_prefix = 'dnslogs/' + today | |
return complete_prefix | |
def help_me(): | |
""" print help | |
:rtype : void | |
""" | |
print "Help - s3query.py" | |
print "Ensure computer has network connectivity, python 2.7 or greater\ | |
and AWS API key/secret available for S3 read operations\n" | |
def usage(): | |
""" print usage | |
:rtype : void | |
""" | |
print "Usage information - no args passed" | |
print 'Usage: ' + sys.argv[0] + ' -h - prints help_me' | |
print 'Usage: ' + sys.argv[0] + ' -s - daily statistics' | |
print 'Usage: ' + sys.argv[0] + ' -d - download today\'s logs' | |
print 'Usage: ' + sys.argv[0] + ' -u - URL or domain query' | |
def download(): | |
""" download today's logs to local filesystem. TODO: take date as param, provide feedback | |
:rtype : void - files dropped on filesystem | |
""" | |
print "downloading todays logs: " + get_date() | |
# list objects in a bucket | |
conn = boto.connect_s3() | |
try: | |
buck = conn.get_bucket(MY_BUCKET) | |
prefix = get_prefix() | |
bucklist = buck.list(prefix) | |
for l in bucklist: | |
# get today's files | |
key_string = str(l.key) | |
eval_dir_path(LOCAL_PATH + key_string) | |
if not os.path.exists(LOCAL_PATH + key_string): | |
l.get_contents_to_filename(LOCAL_PATH + key_string) | |
except IndexError, i: | |
print 'IndexError - "&s"' % str(i) | |
except boto.exception.S3ResponseError, i: | |
print "Check your S3 permissions - " + str(i) | |
def main(): | |
# Get the total number of args passed | |
total = len(sys.argv) | |
if total == 1: | |
usage() | |
sys.exit() | |
if total > 1: | |
# get args and do stuff | |
if (str(sys.argv[1])).startswith('-h'): | |
help_me() | |
if (str(sys.argv[1])).startswith('-s'): | |
stats() | |
if (str(sys.argv[1])).startswith('-d'): | |
download() | |
if (str(sys.argv[1])).startswith('-u'): | |
url(str(sys.argv[2])) | |
sys.exit() | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment