Skip to content

Instantly share code, notes, and snippets.

@rajathithan
Last active October 28, 2023 06:27
Show Gist options
  • Save rajathithan/668c2349d754142bc41cdca9338ce961 to your computer and use it in GitHub Desktop.
Save rajathithan/668c2349d754142bc41cdca9338ce961 to your computer and use it in GitHub Desktop.
Manual generation of events
import apache_beam as beam
from apache_beam.testing.test_stream import TestStream
from datetime import datetime
import pytz
class GenerateEvent(beam.PTransform):
@staticmethod
def sample_data():
return GenerateEvent()
def expand(self, input):
return (input
| TestStream()
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 2, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 3, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 4, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 5, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 6, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 7, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 8, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 9, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 10, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 11, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 12, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 13, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 14, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 15, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 16, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 17, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 18, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 19, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
.add_elements(elements=["event"], event_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to(datetime(2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).timestamp())
.advance_watermark_to_infinity())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment