Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created May 24, 2019 18:36
Show Gist options
  • Save gxercavins/0d336c0f47ee9156e141e8f13c98b682 to your computer and use it in GitHub Desktop.
Save gxercavins/0d336c0f47ee9156e141e8f13c98b682 to your computer and use it in GitHub Desktop.
Stackoverflow question 56295585
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
# input file pattern will be a template parameter
class CustomPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input',
dest='input',
default='gs://dataflow-samples/shakespeare/*.txt',
help='Input path with file to process.')
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
custom_options = pipeline_options.view_as(CustomPipelineOptions)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read Files' >> beam.io.ReadFromText(custom_options.input)
| 'Write Results' >> WriteToText("output.txt"))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment