Last active
August 8, 2018 15:12
-
-
Save craigderington/960bf42a53ab150292a6d37750bf6f72 to your computer and use it in GitHub Desktop.
Python script for testing various Udger parsing functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import csv | |
import xml.etree.ElementTree as ET | |
import logging | |
import ipaddress | |
from udger import Udger | |
from db import db_session | |
from sqlalchemy import text | |
# setup the logging | |
logging.basicConfig(filename='logs/udger_results.log', | |
level=logging.DEBUG, | |
format='%(levelname)s:%(asctime)s %(message)s)') | |
# create an instance of Udger | |
udger = Udger('/var/cache/udger/') | |
def parse_ip(ip): | |
""" | |
Parse & Detect Traffic Type by IP address | |
:param: ip : string | |
:return: dict | |
""" | |
ip_data = {} | |
# console debugging | |
# print('Running Query on: {}'.format(str(ip))) | |
# print(type(ip)) | |
# parse the ip with udger cache | |
try: | |
result = udger.parse_ip(ip) | |
# loop over result and create result dict of values | |
if result: | |
for k, v in result.items(): | |
if 'ip' in k: | |
ip_data[k] = v | |
except ValueError as err: | |
print('An error occurred: {}'.format(str(err))) | |
# return dict | |
return ip_data | |
def parse_ua(ua): | |
""" | |
Parse the User Agent to determine source traffic type | |
:param: ua : string | |
:return: category | |
""" | |
ua_data = {} | |
try: | |
result = udger.parse_ua(ua) | |
# loop over result and create result dict of values | |
if result: | |
for k, v in result.items(): | |
ua_data[k] = v | |
except ValueError as err: | |
print('A parse error occurred: {}'.format(str(err))) | |
# return dict | |
return ua_data | |
def write_log(result): | |
""" | |
Write the data log | |
:param result: | |
:return: log entry | |
""" | |
if isinstance(result, dict): | |
if 'ip' in result.keys(): | |
if 'unrecognized' in result['ip_classification_code']: | |
# logging.info('Visitor from {} received a GO from Udger '.format(result['ip'])) | |
pass | |
else: | |
# we have a crawler or bot, log the visitor for site IQ | |
logging.debug('Visitor from {} received a NO-GO from Udger. IP Classified as {} from {} {}'.format( | |
result['ip'], result['ip_classification'], result['ip_city'], result['ip_country_code'] | |
)) | |
else: | |
logging.warning('Udger IP parse for {} unsuccessful.'.format(result['ip'])) | |
def earl_ips(): | |
""" | |
Use the EARL database to run all IP addresses through Udger | |
:return: udger results | |
""" | |
sequel = text('select ip from visitors order by id asc') | |
result = db_session.execute(sequel) | |
counter = 0 | |
logging.debug('Starting log...') | |
for row in result: | |
try: | |
ip = ipaddress.IPv4Address(row[0]) | |
print('IP: {}'.format(ip)) | |
n1 = parse_ip(str(ip)) | |
print('Udger Result: {}'.format(n1)) | |
write_log(n1) | |
counter += 1 | |
except ValueError as err: | |
print('The IP address: {} did not parse correctly.'.format(row[0])) | |
print('Total Records: {}'.format(counter)) | |
logging.debug('Finished logging...') | |
def read_data(filepath): | |
""" | |
Read an XML file and parse the user agent string | |
:param filepath: | |
:return: results | |
""" | |
counter = 0 | |
detected = 0 | |
try: | |
doc = ET.parse(filepath) | |
for i in doc.findall('user-agent'): | |
ua = i.findtext('String') | |
try: | |
result = udger.parse_ua(ua) | |
if result['ua'] is not None: | |
print(result['ua'], result['ua_class']) | |
detected += 1 | |
counter += 1 | |
# print the counters | |
print('The file processed {} and detected {}'.format(counter, detected)) | |
except ValueError as value_error: | |
print('Could not parse the user agent string {}'.format(str(ua))) | |
except IOError as err: | |
print('File Error: {}. Aborting...'.format(str(err))) | |
def main(): | |
""" | |
Run the program | |
Parse a list of IP addresses and log the Udger results | |
:return: results.log | |
""" | |
try: | |
with open('data/MOCK_DATA.csv', 'r') as f1: | |
rows = csv.DictReader(f1, fieldnames=('ip', 'first_name', 'last_name', 'ip_addr', 'user_agent')) | |
for row in rows: | |
if row['user_agent'] != '': | |
result = parse_ua(row['user_agent']) | |
print(result) | |
except IOError as io_err: | |
print('Could not read the file from the input file path.') | |
try: | |
# test against known list of agents from http://www.user-agents.org | |
read_data('data/allagents.xml') | |
except IOError as io_err2: | |
print('Could not read the XML file at the specified file path.') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment