Skip to content

Instantly share code, notes, and snippets.

@andy51002000
Created February 15, 2021 03:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andy51002000/aee3bba1a5e1a21da4c1bfca3998e6fb to your computer and use it in GitHub Desktop.
Save andy51002000/aee3bba1a5e1a21da4c1bfca3998e6fb to your computer and use it in GitHub Desktop.
dataflow example to count words
class WordcountOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Path of the file to read from')
parser.add_argument(
'--output',
required=True,
help='Output file to write results to.')
pipeline_options = PipelineOptions(['--output', './result.txt'])
p = beam.Pipeline(options=pipeline_options,runner=InteractiveRunner())
wordcount_options = pipeline_options.view_as(WordcountOptions)
count = (p
| 'ReadCollection' >> beam.io.ReadFromText(wordcount_options.input)
| 'findWord' >> beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE))
| "lower" >> beam.Map(lambda word: word.lower())
| "lower_count" >> beam.combiners.Count.PerElement())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment