Created
December 27, 2019 18:55
-
-
Save gxercavins/1b731afaa4be9b1c8784112e491d744b to your computer and use it in GitHub Desktop.
SO question 59458599
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse, json, logging | |
import apache_beam as beam | |
import apache_beam.pvalue as pvalue | |
from apache_beam.io import ReadFromText | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.options.pipeline_options import SetupOptions | |
class EnrichElementsFn(beam.DoFn): | |
"""Zips data with schema stored in GCS""" | |
def process(self, element, schema): | |
field_names = [x['name'] for x in json.loads(schema)] | |
yield zip(field_names, element) | |
class LogElementsFn(beam.DoFn): | |
"""Prints element information""" | |
def process(self, element): | |
logging.info(element) | |
yield element | |
def run(argv=None): | |
parser = argparse.ArgumentParser() | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
pipeline_options = PipelineOptions(pipeline_args) | |
pipeline_options.view_as(SetupOptions).save_main_session = True | |
p = beam.Pipeline(options=pipeline_options) | |
BUCKET='BUCKET_NAME' | |
data = [('NC', 'F', 2020, 'Hello', 3200), | |
('NC', 'F', 2020, 'World', 3180)] | |
schema = (p | |
| 'Read Schema from GCS' >> ReadFromText('gs://{}/schema.json'.format(BUCKET))) | |
(p | |
| 'Create Events' >> beam.Create(data) \ | |
| 'Enrich with side input' >> beam.ParDo(EnrichElementsFn(), pvalue.AsSingleton(schema)) \ | |
| 'Log elements' >> beam.ParDo(LogElementsFn())) | |
result = p.run() | |
result.wait_until_finish() | |
if __name__ == '__main__': | |
logging.getLogger().setLevel(logging.INFO) | |
run() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse, json, logging | |
import apache_beam as beam | |
import apache_beam.pvalue as pvalue | |
from apache_beam.io import ReadFromText | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.options.pipeline_options import SetupOptions | |
BUCKET='BUCKET_NAME' | |
class EnrichElementsFn(beam.DoFn): | |
"""Zips data with schema stored in GCS""" | |
def start_bundle(self): | |
from google.cloud import storage | |
client = storage.Client() | |
blob = client.get_bucket(BUCKET).get_blob('schema.json') | |
self.schema = blob.download_as_string() | |
def process(self, element): | |
field_names = [x['name'] for x in json.loads(self.schema)] | |
yield zip(field_names, element) | |
class LogElementsFn(beam.DoFn): | |
"""Prints element information""" | |
def process(self, element): | |
logging.info(element) | |
yield element | |
def run(argv=None): | |
parser = argparse.ArgumentParser() | |
known_args, pipeline_args = parser.parse_known_args(argv) | |
pipeline_options = PipelineOptions(pipeline_args) | |
pipeline_options.view_as(SetupOptions).save_main_session = True | |
p = beam.Pipeline(options=pipeline_options) | |
data = [('NC', 'F', 2020, 'Hello', 3200), | |
('NC', 'F', 2020, 'World', 3180)] | |
(p | |
| 'Create Events' >> beam.Create(data) \ | |
| 'Enrich with start bundle' >> beam.ParDo(EnrichElementsFn()) \ | |
| 'Log elements' >> beam.ParDo(LogElementsFn())) | |
result = p.run() | |
result.wait_until_finish() | |
if __name__ == '__main__': | |
logging.getLogger().setLevel(logging.INFO) | |
run() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"description": "2-digit state code", | |
"type": "STRING", | |
"name": "state", | |
"mode": "NULLABLE" | |
}, | |
{ | |
"description": "Sex (M=male or F=female)", | |
"type": "STRING", | |
"name": "gender", | |
"mode": "NULLABLE" | |
}, | |
{ | |
"description": "4-digit year of birth", | |
"type": "INTEGER", | |
"name": "year", | |
"mode": "NULLABLE" | |
}, | |
{ | |
"description": "Given name of a person at birth", | |
"type": "STRING", | |
"name": "name", | |
"mode": "NULLABLE" | |
}, | |
{ | |
"description": "Number of occurrences of the name", | |
"type": "INTEGER", | |
"name": "number", | |
"mode": "NULLABLE" | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment