Skip to content

Instantly share code, notes, and snippets.

@pascaldelange
Last active March 25, 2021 12:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pascaldelange/ea1a0907559a2d4bd5706d1d51f2c91b to your computer and use it in GitHub Desktop.
Save pascaldelange/ea1a0907559a2d4bd5706d1d51f2c91b to your computer and use it in GitHub Desktop.
class ReadPubSub(beam.PTransform):
def expand(self, pcoll):
topic_root = "projects/{}/topics/".format(PROJECT_ID)
companies = (
(
pcoll
| "Read companies.create" >> dataflow_tools.ReadFromPubsub(
topic_root + "companies.create"
),
pcoll
| "Read companies.update" >> dataflow_tools.ReadFromPubsub(
topic_root + "companies.update"
)
| "Get new company payload" >> beam.Map(lambda x: x["new"]),
)
| "Merge companies" >> beam.Flatten()
| "Add company key" >> beam.Map(lambda x: ("company_profiles", x))
)
bank_accounts = (
(
pcoll
| "Read accounts.create"
>> dataflow_tools.ReadFromPubsub(topic_root + "accounts.create"),
pcoll
| "Read accounts.update"
>> dataflow_tools.ReadFromPubsub(topic_root + "accounts.update")
| "Get new account payload" >> beam.Map(lambda x: x["new"]),
)
| "Merge accounts" >> beam.Flatten()
| "Add account key" >> beam.Map(lambda x: ("bank_accounts", x))
)
return (
companies,
bank_accounts,
) | "Flatten output" >> beam.Flatten()
# Stream company profiles and bank accounts creation and update to elasticsearch
p = beam.Pipeline(options=pipeline_options)
errors = (
p
| "Read pubsub input" >> ReadPubSub()
| "Write to elasticsearch" >> beam.ParDo(WriteToElasticDoFn())
)
# operations that fail are streamed to BQ to allow us to retry them later
(
errors
| "Cast error for insertion" >> beam.Map(cast_error)
| "BQ Insert"
>> beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
)
)
p.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment