Skip to content

Instantly share code, notes, and snippets.

@tspycher
Last active March 15, 2024 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tspycher/cab8525b09ef780ed84735cb2bc2d1f3 to your computer and use it in GitHub Desktop.
Save tspycher/cab8525b09ef780ed84735cb2bc2d1f3 to your computer and use it in GitHub Desktop.
Very simple Pipeline for a warmup
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from typing import Iterable
import dataclasses
@dataclasses.dataclass
class Person:
GENDER_MALE = "m"
GENDER_FEMALE = "f"
GENDERS = [GENDER_MALE, GENDER_FEMALE]
name: str
email: str
gender: str
@classmethod
def from_dict(cls, data: dict):
return cls(data.get("name"), data.get("email"), data.get("gender").lower())
def __str__(self):
return f"{self.name} <{self.email}>"
class ToModel(beam.DoFn):
def process(self, elem: dict) -> Iterable[Person]:
person = Person.from_dict(elem)
yield person
class Cleanup(beam.DoFn):
def process(self, elem: Person) -> Iterable[Person]:
elem.email = elem.email.lower()
elem.name = elem.name.title()
yield elem
class SplitGenders(beam.DoFn):
def process(self, elem: Person):
yield beam.pvalue.TaggedOutput(elem.gender, elem)
class ShowMale(beam.DoFn):
def process(self, elem: Person) -> Iterable[Person]:
if elem.gender != Person.GENDER_MALE:
raise Exception("Person is not male")
print(f"Male Person: {elem}")
yield elem
class ShowFemale(beam.DoFn):
def process(self, elem: Person) -> Iterable[Person]:
if elem.gender != Person.GENDER_FEMALE:
raise Exception("Person is not female")
print(f"Female Person: {elem}")
yield elem
if __name__ == "__main__":
options = {}
pipeline_options = PipelineOptions(**options)
raw_ugly_data = [
{"name": "Tom Spycher", "email": "me@SpYcher.tld", "gender": "m"},
{"name": "Lara Croft", "email": "tomb@raider.TLD", "gender": "f"},
{"name": "Harrison Ford", "email": "ford@indianajones.com", "gender": "m"},
{"name": "Undefined Person", "email": "undefined@person.com", "gender": "u"},
{"name": "sandra Bullock", "email": "sandra@bullock.TLD", "gender": "f"},
{"name": "pamela anderson", "email": "boobies@anderson.TLD", "gender": "f"}
]
p = beam.Pipeline(options=pipeline_options)
genders = (
p
| "Create" >> beam.Create(raw_ugly_data)
| "To Model" >> beam.ParDo(ToModel())
| "Cleanup" >> beam.ParDo(Cleanup())
| "Gender Sorting" >> beam.ParDo(SplitGenders()).with_outputs(*Person.GENDERS)
)
processed_male = (genders[Person.GENDER_MALE]
| "Show Male Person" >> beam.ParDo(ShowMale())
)
processed_female = (genders[Person.GENDER_FEMALE]
| "Show Female Person" >> beam.ParDo(ShowFemale())
)
( (processed_female, processed_male)
| "Merge" >> beam.Flatten()
| "output" >> beam.Map(print)
)
pipeline_result = p.run()
pipeline_result.wait_until_finish()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment