Last active
February 10, 2023 08:00
-
-
Save timhberry/f80deddd96e14ead805a125ef6067dc0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import logging | |
import random | |
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys | |
from apache_beam.options.pipeline_options import PipelineOptions | |
from apache_beam.transforms.window import FixedWindows | |
# your pub/sub topic, format: projects/<your-project-name>/topics/<your-topic-name> | |
input_topic = '' | |
window_size = 1.0 | |
num_shards = 5 | |
# output path, files will be written with this prefix, must begin with gs:// | |
output_path = '' | |
# replace <your-project-name>, <your-bucket-name> and <your-sa-email> below | |
pipeline_options = PipelineOptions( | |
runner='DataflowRunner', | |
project='<your-project-name>', | |
job_name='pubsub-job', | |
temp_location='gs://<your-bucket-name>/temp', | |
region='europe-west2', | |
streaming=True, | |
save_main_session=True, | |
service_account_email='<your-sa-email>', | |
use_public_ips=False | |
) | |
class GroupMessagesByFixedWindows(PTransform): | |
"""A composite transform that groups Pub/Sub messages based on publish time | |
and outputs a list of tuples, each containing a message and its publish time. | |
""" | |
def __init__(self, window_size, num_shards=5): | |
# Set window size to 60 seconds. | |
self.window_size = int(window_size * 60) | |
self.num_shards = num_shards | |
def expand(self, pcoll): | |
return ( | |
pcoll | |
# Bind window info to each element using element timestamp (or publish time). | |
| "Window into fixed intervals" | |
>> WindowInto(FixedWindows(self.window_size)) | |
| "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) | |
# Assign a random key to each windowed element based on the number of shards. | |
| "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) | |
# Group windowed elements by key. All the elements in the same window must fit | |
# memory for this. If not, you need to use `beam.util.BatchElements`. | |
| "Group by key" >> GroupByKey() | |
) | |
class AddTimestamp(DoFn): | |
def process(self, element, publish_time=DoFn.TimestampParam): | |
"""Processes each windowed element by extracting the message body and its | |
publish time into a tuple. | |
""" | |
yield ( | |
element.decode("utf-8"), | |
datetime.utcfromtimestamp(float(publish_time)).strftime( | |
"%Y-%m-%d %H:%M:%S.%f" | |
), | |
) | |
class WriteToGCS(DoFn): | |
def __init__(self, output_path): | |
self.output_path = output_path | |
def process(self, key_value, window=DoFn.WindowParam): | |
"""Write messages in a batch to Google Cloud Storage.""" | |
ts_format = "%H:%M" | |
window_start = window.start.to_utc_datetime().strftime(ts_format) | |
window_end = window.end.to_utc_datetime().strftime(ts_format) | |
shard_id, batch = key_value | |
filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) | |
with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: | |
for message_body, publish_time in batch: | |
f.write(f"{message_body},{publish_time}\n".encode("utf-8")) | |
with Pipeline(options=pipeline_options) as pipeline: | |
( | |
pipeline | |
# Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam | |
# binds the publish time returned by the Pub/Sub server for each message | |
# to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. | |
# https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub | |
| "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) | |
| "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards) | |
| "Write to GCS" >> ParDo(WriteToGCS(output_path)) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment