Skip to content

Instantly share code, notes, and snippets.

@ilyasahsan123
Last active January 3, 2019 16:21
Show Gist options
  • Save ilyasahsan123/636cfd83513f37e705c81e6072ce2502 to your computer and use it in GitHub Desktop.
Save ilyasahsan123/636cfd83513f37e705c81e6072ce2502 to your computer and use it in GitHub Desktop.
import json
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.gcp.internal.clients import bigquery
if __name__ == "__main__":
# create pipeline
pipeline_options = PipelineOptions(flags=None)
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
# subscribe message from cloud pub/sub
messages = p | beam.io.ReadFromPubSub(subscription="projects/<project_name>/subscriptions/<subscription_name>")
# batch every 5 second
batch = (messages | beam.WindowInto(window.FixedWindows(5, 0)))
# convert from string to json
output = (batch | 'to_json' >> beam.Map(lambda x: json.loads(x)))
# configure project_id, dataset, table, and columns
table_spec = '<project_id>:<schema>.<table_name>'
table_schema = 'nomor_urut:INTEGER, nama:STRING, motto:STRING'
# store data to BigQuery
output | beam.io.WriteToBigQuery(table_spec,schema=table_schema)
# Waits until the pipeline finishes and returns the final status.
result= p.run()
result.wait_until_finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment