Skip to content

Instantly share code, notes, and snippets.

Last active February 10, 2023 08:00
Show Gist options
  • Save timhberry/f80deddd96e14ead805a125ef6067dc0 to your computer and use it in GitHub Desktop.
Save timhberry/f80deddd96e14ead805a125ef6067dc0 to your computer and use it in GitHub Desktop.
from datetime import datetime
import logging
import random
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows
# your pub/sub topic, format: projects/<your-project-name>/topics/<your-topic-name>
input_topic = ''
window_size = 1.0
num_shards = 5
# output path, files will be written with this prefix, must begin with gs://
output_path = ''
# replace <your-project-name>, <your-bucket-name> and <your-sa-email> below
pipeline_options = PipelineOptions(
class GroupMessagesByFixedWindows(PTransform):
"""A composite transform that groups Pub/Sub messages based on publish time
and outputs a list of tuples, each containing a message and its publish time.
def __init__(self, window_size, num_shards=5):
# Set window size to 60 seconds.
self.window_size = int(window_size * 60)
self.num_shards = num_shards
def expand(self, pcoll):
return (
# Bind window info to each element using element timestamp (or publish time).
| "Window into fixed intervals"
>> WindowInto(FixedWindows(self.window_size))
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
# Assign a random key to each windowed element based on the number of shards.
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
# Group windowed elements by key. All the elements in the same window must fit
# memory for this. If not, you need to use `beam.util.BatchElements`.
| "Group by key" >> GroupByKey()
class AddTimestamp(DoFn):
def process(self, element, publish_time=DoFn.TimestampParam):
"""Processes each windowed element by extracting the message body and its
publish time into a tuple.
yield (
"%Y-%m-%d %H:%M:%S.%f"
class WriteToGCS(DoFn):
def __init__(self, output_path):
self.output_path = output_path
def process(self, key_value, window=DoFn.WindowParam):
"""Write messages in a batch to Google Cloud Storage."""
ts_format = "%H:%M"
window_start = window.start.to_utc_datetime().strftime(ts_format)
window_end = window.end.to_utc_datetime().strftime(ts_format)
shard_id, batch = key_value
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
for message_body, publish_time in batch:
with Pipeline(options=pipeline_options) as pipeline:
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
# binds the publish time returned by the Pub/Sub server for each message
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
| "Write to GCS" >> ParDo(WriteToGCS(output_path))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment