Skip to content

Instantly share code, notes, and snippets.

@craigderington
Last active August 8, 2018 15:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save craigderington/960bf42a53ab150292a6d37750bf6f72 to your computer and use it in GitHub Desktop.
Save craigderington/960bf42a53ab150292a6d37750bf6f72 to your computer and use it in GitHub Desktop.
Python script for testing various Udger parsing functions
# -*- coding: utf-8 -*-
import csv
import xml.etree.ElementTree as ET
import logging
import ipaddress
from udger import Udger
from db import db_session
from sqlalchemy import text
# setup the logging
logging.basicConfig(filename='logs/udger_results.log',
level=logging.DEBUG,
format='%(levelname)s:%(asctime)s %(message)s)')
# create an instance of Udger
udger = Udger('/var/cache/udger/')
def parse_ip(ip):
"""
Parse & Detect Traffic Type by IP address
:param: ip : string
:return: dict
"""
ip_data = {}
# console debugging
# print('Running Query on: {}'.format(str(ip)))
# print(type(ip))
# parse the ip with udger cache
try:
result = udger.parse_ip(ip)
# loop over result and create result dict of values
if result:
for k, v in result.items():
if 'ip' in k:
ip_data[k] = v
except ValueError as err:
print('An error occurred: {}'.format(str(err)))
# return dict
return ip_data
def parse_ua(ua):
"""
Parse the User Agent to determine source traffic type
:param: ua : string
:return: category
"""
ua_data = {}
try:
result = udger.parse_ua(ua)
# loop over result and create result dict of values
if result:
for k, v in result.items():
ua_data[k] = v
except ValueError as err:
print('A parse error occurred: {}'.format(str(err)))
# return dict
return ua_data
def write_log(result):
"""
Write the data log
:param result:
:return: log entry
"""
if isinstance(result, dict):
if 'ip' in result.keys():
if 'unrecognized' in result['ip_classification_code']:
# logging.info('Visitor from {} received a GO from Udger '.format(result['ip']))
pass
else:
# we have a crawler or bot, log the visitor for site IQ
logging.debug('Visitor from {} received a NO-GO from Udger. IP Classified as {} from {} {}'.format(
result['ip'], result['ip_classification'], result['ip_city'], result['ip_country_code']
))
else:
logging.warning('Udger IP parse for {} unsuccessful.'.format(result['ip']))
def earl_ips():
"""
Use the EARL database to run all IP addresses through Udger
:return: udger results
"""
sequel = text('select ip from visitors order by id asc')
result = db_session.execute(sequel)
counter = 0
logging.debug('Starting log...')
for row in result:
try:
ip = ipaddress.IPv4Address(row[0])
print('IP: {}'.format(ip))
n1 = parse_ip(str(ip))
print('Udger Result: {}'.format(n1))
write_log(n1)
counter += 1
except ValueError as err:
print('The IP address: {} did not parse correctly.'.format(row[0]))
print('Total Records: {}'.format(counter))
logging.debug('Finished logging...')
def read_data(filepath):
"""
Read an XML file and parse the user agent string
:param filepath:
:return: results
"""
counter = 0
detected = 0
try:
doc = ET.parse(filepath)
for i in doc.findall('user-agent'):
ua = i.findtext('String')
try:
result = udger.parse_ua(ua)
if result['ua'] is not None:
print(result['ua'], result['ua_class'])
detected += 1
counter += 1
# print the counters
print('The file processed {} and detected {}'.format(counter, detected))
except ValueError as value_error:
print('Could not parse the user agent string {}'.format(str(ua)))
except IOError as err:
print('File Error: {}. Aborting...'.format(str(err)))
def main():
"""
Run the program
Parse a list of IP addresses and log the Udger results
:return: results.log
"""
try:
with open('data/MOCK_DATA.csv', 'r') as f1:
rows = csv.DictReader(f1, fieldnames=('ip', 'first_name', 'last_name', 'ip_addr', 'user_agent'))
for row in rows:
if row['user_agent'] != '':
result = parse_ua(row['user_agent'])
print(result)
except IOError as io_err:
print('Could not read the file from the input file path.')
try:
# test against known list of agents from http://www.user-agents.org
read_data('data/allagents.xml')
except IOError as io_err2:
print('Could not read the XML file at the specified file path.')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment