Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save martin-loetzsch/c5a2e679aab12978d032deb0d3ea24a6 to your computer and use it in GitHub Desktop.
Save martin-loetzsch/c5a2e679aab12978d032deb0d3ea24a6 to your computer and use it in GitHub Desktop.
import base64
import functools
import json
import geoip2.database
from google.cloud import bigquery
from ua_parser import user_agent_parser
@functools.lru_cache(maxsize=None)
def get_geo_db():
return geoip2.database.Reader('./GeoLite2-City_20200307/GeoLite2-City.mmdb')
def extract_geo_data(ip):
"""Does a geo lookup for an IP address"""
response = get_geo_db().city(ip)
return {
'country_iso_code': response.country.iso_code,
'country_name': response.country.name,
'subdivisions_iso_code': response.subdivisions.most_specific.iso_code,
'subdivisions_name': response.subdivisions.most_specific.name,
'city_name': response.city.name
}
def parse_user_agent(user_agent_string):
"""Extracts browser, OS and device information from an user agent"""
result = user_agent_parser.Parse(user_agent_string)
return {
'browser_family': result['user_agent']['family'],
'browser_version': result['user_agent']['major'],
'os_family': result['os']['family'],
'os_version': result['os']['major'],
'device_brand': result['device']['brand'],
'device_model': result['device']['model']
}
def lambda_handler(event, context):
"""Preprocess a raw event in the Kinesis queue, then write it to BigQuery and S3"""
lambda_output_records = []
rows_for_biguery = []
bq_client = bigquery.Client.from_service_account_json('BigQuery-6e433016ee6b.json')
for record in event['records']:
message = json.loads(base64.b64decode(record['data']))
# extract browser, device, os
if message['ua']:
message.update(parse_user_agent(message['ua']))
del message['ua']
# geo lookup for ip address
message.update(extract_geo_data(message['ip']))
del message['ip']
# update get parameters
if message['query']:
message['query'] = [{'param': param, 'value': value}
for param, value in message['query'].items()]
rows_for_biguery.append(message)
lambda_output_records.append({
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(message).encode('utf-8')).decode('utf-8')
})
errors = bq_client.insert_rows(
bq_client.get_table(bq_client.dataset('server_side_tracking').table('project_a_website_events')),
rows_for_biguery)
if errors != []:
raise Exception(json.dumps(errors))
return {
"statusCode": 200,
"body": json.dumps('OK'),
"records": lambda_output_records
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment