Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Last active September 15, 2019 09:15
Show Gist options
  • Save gxercavins/8e759a41421e93ab77d46c72e1adf808 to your computer and use it in GitHub Desktop.
Save gxercavins/8e759a41421e93ab77d46c72e1adf808 to your computer and use it in GitHub Desktop.
SO Question 57940102
import argparse, json, logging
import apache_beam as beam
from apache_beam.io.gcp import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
PROJECT="PROJECT_ID"
BQ_DATASET="DATASET_NAME"
SCHEMA_RAW_TABLE = 'evt_time:TIMESTAMP,' \
'payload:STRING'
SCHEMA_PARSED_TABLE = 'evt_time:TIMESTAMP,' \
'key1:STRING,' \
'key2:STRING'
def add_timestamps(e,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
payload = e['data'].decode()
evt_time = e['attributes']['evt_time']
row = {'evt_time' : evt_time, 'payload' : payload}
return row
def parse_payload(element,timestamp=beam.DoFn.TimestampParam, *args,**kwargs):
e = element.copy()
payload = json.loads(e.pop('payload'))
e.update(payload)
return e
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
data = [{'data': json.dumps({ "key1": "val1", "key2": "val2"}).encode(),
'attributes':{'evt_time': '2019-09-14T22:12:43.323546Z'}}]
row = (p
| "read_sub" >> beam.Create(data)
| "add_timestamps" >> beam.Map(add_timestamps))
(row | "raw_stream_to_bq" >> bigquery.WriteToBigQuery(project=PROJECT,
dataset=BQ_DATASET, table="test_raw", schema=SCHEMA_RAW_TABLE))
(row | "parse" >> beam.Map(parse_payload) \
| "parsed_stream_to_bq" >> bigquery.WriteToBigQuery(project=PROJECT,
dataset=BQ_DATASET, table="test_parsed", schema=SCHEMA_PARSED_TABLE))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment