Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created July 6, 2019 11:09
Show Gist options
  • Save gxercavins/933649b217ab29660502a105ddc8e892 to your computer and use it in GitHub Desktop.
Save gxercavins/933649b217ab29660502a105ddc8e892 to your computer and use it in GitHub Desktop.
SO question 56913056
yield element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
return element
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: score
INFO:root:Event: team
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: score
INFO:root:Event: team
return [element]
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 10, 'team': 'red'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
INFO:root:Event: {'score': 8, 'team': 'blue'}
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Event: %s", element)
# yield element
# return element
return [element]
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
data = [{'team': 'red', 'score': 10},
{'team': 'blue', 'score': 8}]
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Log results 1' >> beam.ParDo(LogResults())
| 'Log results 2' >> beam.ParDo(LogResults()))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment