-
-
Save mechanicalgirl/2f2bcfab867aafca146765ed7bbf4c4c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from flask import Flask, request | |
from google.cloud import bigquery | |
app = Flask(__name__) | |
schema = [ | |
bigquery.SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE'), | |
bigquery.SchemaField('event_type', 'STRING', 'NULLABLE'), | |
bigquery.SchemaField('event_id', 'STRING', 'NULLABLE'), | |
bigquery.SchemaField('has_insights', 'BOOLEAN', 'NULLABLE'), | |
bigquery.SchemaField('video_insights', 'RECORD', 'REPEATED', | |
fields=[ | |
bigquery.SchemaField('video_id', "STRING", 'NULLABLE'), | |
bigquery.SchemaField('video_duration', "INTEGER", 'NULLABLE'), | |
], | |
), | |
bigquery.SchemaField('categories', 'STRING', 'REPEATED'), | |
] | |
dataset = 'my_dataset' | |
table_name = 'my_events_table' | |
def events(request): | |
payload = {} | |
try: | |
payload = request.get_json() | |
events = payload['events'] | |
except Exception as e: | |
response = app.response_class( | |
response=json.dumps({'error': e.message}), | |
status=400, | |
mimetype='application/json' | |
) | |
return response | |
try: | |
create_table() | |
except Exception as e: | |
print("ERROR", e) | |
event_rows = [] | |
for p in payload['events']: | |
entry = construct_entry(payload, p) | |
event_rows.append(entry) | |
try: | |
insert_entries(event_rows) | |
except Exception as e: | |
print("Error on inserting entries: %s" % e) | |
sys.exit() | |
response = app.response_class(response='', status=200) | |
return response | |
def create_table(): | |
client = bigquery.Client() | |
dataset_ref = client.dataset(dataset) | |
table_ref = dataset_ref.table(table_name) | |
table = bigquery.Table(table_ref, schema=schema) | |
table = client.create_table(table) | |
print("Created table {}".format(table.full_table_id)) | |
return table.full_table_id | |
def construct_entry(payload, event): | |
insights_list = [] | |
if event.get('video_insights', None): | |
for i in event['video_insights']: | |
v = { | |
'id': vp.get('video_id', ''), | |
'time_played': vp.get('video_duration', 0), | |
} | |
insights_list.append(v) | |
entry = { | |
'timestamp': event.get('timestamp', None), | |
'type': event.get('event_type', ''), | |
'id': event.get('event_id', ''), | |
'categories': event.get('categories', []), | |
'has_insights': event.get('has_insights', False), | |
'insights': insights_list, | |
} | |
return entry | |
def insert_entries(event_rows): | |
client = bigquery.Client() | |
dataset_ref = client.dataset(dataset) | |
table_ref = dataset_ref.table(table_name) | |
table = bigquery.Table(table_ref, schema=schema) | |
try: | |
response = client.insert_rows(table, event_rows) | |
except Exception as e: | |
print("Error: %s" % str(e)) | |
return False | |
return True | |
if __name__ == '__main__': | |
app = Flask(__name__) | |
app.route('/events', methods=['POST'])(lambda: events(request)) | |
app.run(debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment