Skip to content

Instantly share code, notes, and snippets.

@robertwb
Last active December 14, 2022 21:33
Show Gist options
  • Save robertwb/0bab10a4ebf1001e187bbe3f5241023a to your computer and use it in GitHub Desktop.
Save robertwb/0bab10a4ebf1001e187bbe3f5241023a to your computer and use it in GitHub Desktop.
Apache Beam yaml wordcount
pipeline:
- type: chain
transforms:
- type: ReadFromCsv
path: "gs://apache-beam-samples/nasa_jpl_asteroid/sample_1000.csv"
- type: PyFilter
fn: "lambda astroid: (astroid.diameter or 0) > 250"
- type: PyMap
fn: print
pipeline:
# Bare transforms can be listed here.
- type: Create
name: Numbers
elements: [1, 2, 3]
# We can structure our code into composites.
- type: composite
name: DoSomeMath
# These are local names referencing outputs in the enclosing scope.
input:
numbers: Numbers
# This indicates which (transform's) outputs should be considered outputs
# of this composite.
output:
both: Flatten
transforms:
- type: PyMap
name: Square
input: numbers
fn: "lambda x: x*x"
- type: PyMap
name: Cube
input: numbers
fn: "lambda x: x*x*x"
- type: Flatten
input:
first: Square
second: Cube
# Consume the result of the composite.
- type: PyMap
input: DoSomeMath # or DoSomeMath.both
fn: "import logging\nlogging.error"
pipeline:
# A chain is the simplest type of transform.
# Each transform's output feeds to the next input.
- type: chain
transforms:
- type: Create
elements: ['to', 'be', 'or', 'not', 'to', 'be']
# This will give a schema'd PCollectin.
- type: PyMap
fn: "lambda x: beam.Row(word=x, len=len(x))"
- type: Sql
query: "SELECT word, count(*) as c FROM PCOLLECTION GROUP BY word"
- type: PyMap
fn: print
# These are actually built-in, but provided explicitly as an example.
# Though these examples use beam-provided transforms, any externally
# packaged transforms would work here as well.
providers:
- type: beamJar
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'
version: '2.43.0'
transforms:
Sql: 'beam:external:java:sql:v1'
# Multiple providers can be provided for a transform(s), as long as they
# provide, semantically, the same operation.
# This can be useful to provide alternative implementations of the expansion
# service (e.g. java vs docker) depending on what a user has installed on
# their machine.
- type: mavenJar
artifact_id: 'beam-sdks-java-extensions-sql-expansion-service org.apache.beam'
group_id: 'org.apache.beam'
version: '2.43.0'
transforms:
Sql: 'beam:external:java:sql:v1'
- type: pypi
packages:
# Typically Beam would be a dependency of whatever dependencies are
# listed here, but this is just for a working example.
- apache_beam
transforms:
PyPiGroupByKey: 'apache_beam.GroupByKey'
pipeline:
- type: chain
transforms:
- type: ReadFromText
args:
file_pattern: "gs://dataflow-samples/shakespeare/kinglear.txt"
- type: PyMap
fn: "str.lower"
- type: PyFlatMap
fn: "import re\nlambda line: re.findall('[a-z]+', line)"
- type: PyTransform
name: Count
constructor: "apache_beam.transforms.combiners.Count.PerElement"
- type: PyMap
fn: str
- type: PyMap
fn: "import logging\nlogging.error"
# - type: WriteToText
# file_path_prefix: "counts.txt"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment