Created
April 15, 2020 14:39
-
-
Save martin-loetzsch/c5a2e679aab12978d032deb0d3ea24a6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import functools | |
import json | |
import geoip2.database | |
from google.cloud import bigquery | |
from ua_parser import user_agent_parser | |
@functools.lru_cache(maxsize=None) | |
def get_geo_db(): | |
return geoip2.database.Reader('./GeoLite2-City_20200307/GeoLite2-City.mmdb') | |
def extract_geo_data(ip): | |
"""Does a geo lookup for an IP address""" | |
response = get_geo_db().city(ip) | |
return { | |
'country_iso_code': response.country.iso_code, | |
'country_name': response.country.name, | |
'subdivisions_iso_code': response.subdivisions.most_specific.iso_code, | |
'subdivisions_name': response.subdivisions.most_specific.name, | |
'city_name': response.city.name | |
} | |
def parse_user_agent(user_agent_string): | |
"""Extracts browser, OS and device information from an user agent""" | |
result = user_agent_parser.Parse(user_agent_string) | |
return { | |
'browser_family': result['user_agent']['family'], | |
'browser_version': result['user_agent']['major'], | |
'os_family': result['os']['family'], | |
'os_version': result['os']['major'], | |
'device_brand': result['device']['brand'], | |
'device_model': result['device']['model'] | |
} | |
def lambda_handler(event, context): | |
"""Preprocess a raw event in the Kinesis queue, then write it to BigQuery and S3""" | |
lambda_output_records = [] | |
rows_for_biguery = [] | |
bq_client = bigquery.Client.from_service_account_json('BigQuery-6e433016ee6b.json') | |
for record in event['records']: | |
message = json.loads(base64.b64decode(record['data'])) | |
# extract browser, device, os | |
if message['ua']: | |
message.update(parse_user_agent(message['ua'])) | |
del message['ua'] | |
# geo lookup for ip address | |
message.update(extract_geo_data(message['ip'])) | |
del message['ip'] | |
# update get parameters | |
if message['query']: | |
message['query'] = [{'param': param, 'value': value} | |
for param, value in message['query'].items()] | |
rows_for_biguery.append(message) | |
lambda_output_records.append({ | |
'recordId': record['recordId'], | |
'result': 'Ok', | |
'data': base64.b64encode(json.dumps(message).encode('utf-8')).decode('utf-8') | |
}) | |
errors = bq_client.insert_rows( | |
bq_client.get_table(bq_client.dataset('server_side_tracking').table('project_a_website_events')), | |
rows_for_biguery) | |
if errors != []: | |
raise Exception(json.dumps(errors)) | |
return { | |
"statusCode": 200, | |
"body": json.dumps('OK'), | |
"records": lambda_output_records | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment