Skip to content

Instantly share code, notes, and snippets.

@rajathithan
Last active October 28, 2023 06:31
Show Gist options
  • Save rajathithan/31e7cc7d2d73712ed767ae4a89aa6692 to your computer and use it in GitHub Desktop.
Save rajathithan/31e7cc7d2d73712ed767ae4a89aa6692 to your computer and use it in GitHub Desktop.
Trigger results on the count of element before watermark arrival on window time
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.utils.timestamp import Duration
from log_elements import LogElements
class CountEventsWithEarlyTrigger(beam.PTransform):
def expand(self, events):
return (events
| beam.WindowInto(FixedWindows(1 * 24 * 60 * 60), # 1 Day Window
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.DISCARDING,
allowed_lateness=Duration(seconds=0))
| beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True # Required to get multiple trigger firing outputs
with beam.Pipeline(options=options) as p:
(p | GenerateEvent.sample_data()
| CountEventsWithEarlyTrigger()
| LogElements(with_window=True))
'''
Answer:
4, window(start=2021-03-01T00:00:00Z, end=2021-03-01T00:00:05Z)
5, window(start=2021-03-01T00:00:05Z, end=2021-03-01T00:00:10Z)
5, window(start=2021-03-01T00:00:10Z, end=2021-03-01T00:00:15Z)
5, window(start=2021-03-01T00:00:15Z, end=2021-03-01T00:00:20Z)
1, window(start=2021-03-01T00:00:20Z, end=2021-03-01T00:00:25Z)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment