Last active
October 28, 2023 06:31
-
-
Save rajathithan/31e7cc7d2d73712ed767ae4a89aa6692 to your computer and use it in GitHub Desktop.
Trigger results on the count of element before watermark arrival on window time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from apache_beam.transforms.trigger import AccumulationMode | |
from apache_beam.utils.timestamp import Duration | |
from log_elements import LogElements | |
class CountEventsWithEarlyTrigger(beam.PTransform): | |
def expand(self, events): | |
return (events | |
| beam.WindowInto(FixedWindows(1 * 24 * 60 * 60), # 1 Day Window | |
trigger=AfterWatermark(early=AfterCount(1)), | |
accumulation_mode=AccumulationMode.DISCARDING, | |
allowed_lateness=Duration(seconds=0)) | |
| beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults()) | |
options = PipelineOptions() | |
options.view_as(StandardOptions).streaming = True # Required to get multiple trigger firing outputs | |
with beam.Pipeline(options=options) as p: | |
(p | GenerateEvent.sample_data() | |
| CountEventsWithEarlyTrigger() | |
| LogElements(with_window=True)) | |
''' | |
Answer: | |
4, window(start=2021-03-01T00:00:00Z, end=2021-03-01T00:00:05Z) | |
5, window(start=2021-03-01T00:00:05Z, end=2021-03-01T00:00:10Z) | |
5, window(start=2021-03-01T00:00:10Z, end=2021-03-01T00:00:15Z) | |
5, window(start=2021-03-01T00:00:15Z, end=2021-03-01T00:00:20Z) | |
1, window(start=2021-03-01T00:00:20Z, end=2021-03-01T00:00:25Z) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment