Skip to content

Instantly share code, notes, and snippets.

@boseoladipo
Last active July 16, 2020 10:18
Show Gist options
  • Save boseoladipo/313c2c7e85333da7218ab74061d36bb9 to your computer and use it in GitHub Desktop.
Save boseoladipo/313c2c7e85333da7218ab74061d36bb9 to your computer and use it in GitHub Desktop.
Pipeline for loading exponea data
payload = {
"customer_ids": {"cookie": ""},
"event_types": [
"campaign",
"first_session",
"session_start",
"session_end",
"reserve_car",
"view item",
"view_car",
"view_category",
"add item to cart",
"preorder",
"autoprenuer_signup",
"book_inspection",
"set_city",
"set_location",
]}
events = p \
| 'Input: ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(
query=source_query,
use_standard_sql=True)) \
| 'Transform: RequestData' >> beam.ParDo(FetchEventsFn(payload))
events |'Output: WriteToBigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
destination_table,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
result = p.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment