Skip to content

Instantly share code, notes, and snippets.

@fozziethebeat
Created November 30, 2021 01:09
Show Gist options
  • Save fozziethebeat/d3a5a73cf6b1604139b78cbb2eb53b71 to your computer and use it in GitHub Desktop.
Save fozziethebeat/d3a5a73cf6b1604139b78cbb2eb53b71 to your computer and use it in GitHub Desktop.
Simple Beam CAN Pipeline
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from libs.datasets import combined_datasets
from pyseir.run import OneRegionPipeline
class ProcessOneRegion(beam.DoFn):
def process(self, one_region):
yield OneRegionPipeline.run(one_region).infer_df.to_csv()
beam_options = PipelineOptions()
with beam.Pipeline(options=beam_options) as pipeline:
regions = combined_datasets.load_us_timeseries_dataset().get_subset(
states=['NY', 'CA'])
regions = [one_region for _, one_region in regions.iter_one_regions()]
(pipeline
| 'Read Regions' >> beam.Create(regions)
| 'Process One Region' >> beam.ParDo(ProcessOneRegion())
| 'Write CSV' >> beam.io.WriteToText('out.csv'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment