Skip to content

Instantly share code, notes, and snippets.

@mechanicalgirl
Last active April 25, 2019 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mechanicalgirl/2f2bcfab867aafca146765ed7bbf4c4c to your computer and use it in GitHub Desktop.
Save mechanicalgirl/2f2bcfab867aafca146765ed7bbf4c4c to your computer and use it in GitHub Desktop.
import json
from flask import Flask, request
from google.cloud import bigquery
app = Flask(__name__)
schema = [
bigquery.SchemaField('timestamp', 'TIMESTAMP', 'NULLABLE'),
bigquery.SchemaField('event_type', 'STRING', 'NULLABLE'),
bigquery.SchemaField('event_id', 'STRING', 'NULLABLE'),
bigquery.SchemaField('has_insights', 'BOOLEAN', 'NULLABLE'),
bigquery.SchemaField('video_insights', 'RECORD', 'REPEATED',
fields=[
bigquery.SchemaField('video_id', "STRING", 'NULLABLE'),
bigquery.SchemaField('video_duration', "INTEGER", 'NULLABLE'),
],
),
bigquery.SchemaField('categories', 'STRING', 'REPEATED'),
]
dataset = 'my_dataset'
table_name = 'my_events_table'
def events(request):
payload = {}
try:
payload = request.get_json()
events = payload['events']
except Exception as e:
response = app.response_class(
response=json.dumps({'error': e.message}),
status=400,
mimetype='application/json'
)
return response
try:
create_table()
except Exception as e:
print("ERROR", e)
event_rows = []
for p in payload['events']:
entry = construct_entry(payload, p)
event_rows.append(entry)
try:
insert_entries(event_rows)
except Exception as e:
print("Error on inserting entries: %s" % e)
sys.exit()
response = app.response_class(response='', status=200)
return response
def create_table():
client = bigquery.Client()
dataset_ref = client.dataset(dataset)
table_ref = dataset_ref.table(table_name)
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)
print("Created table {}".format(table.full_table_id))
return table.full_table_id
def construct_entry(payload, event):
insights_list = []
if event.get('video_insights', None):
for i in event['video_insights']:
v = {
'id': vp.get('video_id', ''),
'time_played': vp.get('video_duration', 0),
}
insights_list.append(v)
entry = {
'timestamp': event.get('timestamp', None),
'type': event.get('event_type', ''),
'id': event.get('event_id', ''),
'categories': event.get('categories', []),
'has_insights': event.get('has_insights', False),
'insights': insights_list,
}
return entry
def insert_entries(event_rows):
client = bigquery.Client()
dataset_ref = client.dataset(dataset)
table_ref = dataset_ref.table(table_name)
table = bigquery.Table(table_ref, schema=schema)
try:
response = client.insert_rows(table, event_rows)
except Exception as e:
print("Error: %s" % str(e))
return False
return True
if __name__ == '__main__':
app = Flask(__name__)
app.route('/events', methods=['POST'])(lambda: events(request))
app.run(debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment