Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created February 8, 2020 14:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gxercavins/7645731c7a62dd4bb620f820e99bd3ad to your computer and use it in GitHub Desktop.
Save gxercavins/7645731c7a62dd4bb620f820e99bd3ad to your computer and use it in GitHub Desktop.
SO question 60080589
import argparse, logging
import pandas as pd
from random import choice
import apache_beam as beam
from apache_beam.options.pipeline_options import SetupOptions
import apache_beam.transforms.combiners as combine
import apache_beam.pvalue as pvalue
def create_dataframes(element):
project_list = ['Beam', 'Spark', 'Flink', 'Nemo', 'Aiflow']
for _ in range(10):
data = {'projects' : [choice(project_list)]}
yield pd.DataFrame(data)
class merge_dataframes(beam.DoFn):
def process(self, element):
logging.info(element)
logging.info(type(element))
yield pd.concat(element).reset_index(drop=True)
def run_pipeline(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = beam.pipeline.PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p
| 'Start' >> beam.Create(['Start'])
| 'Create DataFrames' >> beam.FlatMap(create_dataframes)
| 'Combine To List' >> beam.combiners.ToList()
| 'Merge DataFrames' >> beam.ParDo(merge_dataframes())
| 'Print results' >> beam.Map(print))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run_pipeline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment