Skip to content

Instantly share code, notes, and snippets.

@timhberry
Last active February 10, 2023 08:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timhberry/e39cf8bb2e92e690f3ca8d884d87320c to your computer and use it in GitHub Desktop.
Save timhberry/e39cf8bb2e92e690f3ca8d884d87320c to your computer and use it in GitHub Desktop.
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
# pub/sub subscription in the format: projects/<your-project>/subscriptions/<subscription-name>
sub = ''
# bigquery table in the format: project:dataset.table
# create table first with correct schema
output_table = 'ab-academy-demo:btlearningpath.dflowtest'
schema = 'country:STRING,country_code:STRING,region:STRING,region_code:STRING,city:STRING,date:DATE,download_kbps:FLOAT,upload_kbps:FLOAT,total_tests:INTEGER,distance_miles:FLOAT'
class CustomParsing(beam.DoFn):
def process(self, element: bytes, timestamp=beam.DoFn.TimestampParam, window=beam.DoFn.WindowParam):
"""
Simple processing function to parse the data
"""
parsed = json.loads(element.decode("utf-8"))
yield parsed
# replace <your-project-name> and <your-bucket-name> below
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='<your-project-name>',
job_name='dataflow-to-bigquery',
temp_location='gs://<your-bucket-name>/temp',
region='europe-west2',
streaming=True,
save_main_session=True,
service_account_email='<your-sa-email>',
use_public_ips=False
)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read from Pub/Sub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=sub)
| "CustomParse" >> beam.ParDo(CustomParsing())
| 'Write to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
output_table,
schema=schema,
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment