Skip to content

Instantly share code, notes, and snippets.

@timhberry
Last active February 10, 2023 08:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timhberry/96745e6c59a8b87471c3a8ec8dd918f0 to your computer and use it in GitHub Desktop.
Save timhberry/96745e6c59a8b87471c3a8ec8dd918f0 to your computer and use it in GitHub Desktop.
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Input and output files should be Cloud Storage locations
# beginning with gs://
input_file = 'gs://<your-bucket-name>/kinglear.txt'
output_path = 'gs://<your-bucket-name>/counts.txt'
# Replace <your-bucket-name>, <your-project-name> and <your-sa-email>
beam_options = PipelineOptions(
runner='DataflowRunner',
project='<your-project-name>',
job_name='wordcount',
temp_location='gs://<your-bucket-name>/temp',
region='europe-west2',
service_account_email='<your-sa-email>',
use_public_ips=False
)
pipeline = beam.Pipeline(options=beam_options)
(pipeline
| beam.io.ReadFromText(input_file)
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| beam.combiners.Count.PerElement()
| beam.MapTuple(lambda word, count: '%s: %s' % (word,count))
| beam.io.WriteToText(output_path)
)
pipeline.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment