Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created December 27, 2019 18:55
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save gxercavins/1b731afaa4be9b1c8784112e491d744b to your computer and use it in GitHub Desktop.
SO question 59458599
import argparse, json, logging
import apache_beam as beam
import apache_beam.pvalue as pvalue
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def process(self, element, schema):
field_names = [x['name'] for x in json.loads(schema)]
yield zip(field_names, element)
class LogElementsFn(beam.DoFn):
"""Prints element information"""
def process(self, element):
logging.info(element)
yield element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
BUCKET='BUCKET_NAME'
data = [('NC', 'F', 2020, 'Hello', 3200),
('NC', 'F', 2020, 'World', 3180)]
schema = (p
| 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET)))
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
import argparse, json, logging
import apache_beam as beam
import apache_beam.pvalue as pvalue
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
BUCKET='BUCKET_NAME'
class EnrichElementsFn(beam.DoFn):
"""Zips data with schema stored in GCS"""
def start_bundle(self):
from google.cloud import storage
client = storage.Client()
blob = client.get_bucket(BUCKET).get_blob('schema.json')
self.schema = blob.download_as_string()
def process(self, element):
field_names = [x['name'] for x in json.loads(self.schema)]
yield zip(field_names, element)
class LogElementsFn(beam.DoFn):
"""Prints element information"""
def process(self, element):
logging.info(element)
yield element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
data = [('NC', 'F', 2020, 'Hello', 3200),
('NC', 'F', 2020, 'World', 3180)]
(p
| 'Create Events' >> beam.Create(data) \
| 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \
| 'Log elements' >> beam.ParDo(LogElementsFn()))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
[
{
"description": "2-digit state code",
"type": "STRING",
"name": "state",
"mode": "NULLABLE"
},
{
"description": "Sex (M=male or F=female)",
"type": "STRING",
"name": "gender",
"mode": "NULLABLE"
},
{
"description": "4-digit year of birth",
"type": "INTEGER",
"name": "year",
"mode": "NULLABLE"
},
{
"description": "Given name of a person at birth",
"type": "STRING",
"name": "name",
"mode": "NULLABLE"
},
{
"description": "Number of occurrences of the name",
"type": "INTEGER",
"name": "number",
"mode": "NULLABLE"
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment