Skip to content

Instantly share code, notes, and snippets.

@rajathithan
Last active October 28, 2023 06:40
Show Gist options
  • Save rajathithan/ebb6f9fc755129ac15f5150f4f179727 to your computer and use it in GitHub Desktop.
Save rajathithan/ebb6f9fc755129ac15f5150f4f179727 to your computer and use it in GitHub Desktop.
Dataflow event trigger with accumulation mode ( streaming )
import apache_beam as beam
from generate_event import GenerateEvent
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AccumulationMode
from apache_beam.utils.timestamp import Duration
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from log_elements import LogElements
class CountEventsWithAccumulating(beam.PTransform):
def expand(self, events):
return (events
| beam.WindowInto(FixedWindows(1 * 24 * 60 * 60), # 1 Day Window
trigger=AfterWatermark(early=AfterCount(1)),
accumulation_mode=AccumulationMode.ACCUMULATING,
allowed_lateness=Duration(seconds=0))
| beam.CombineGlobally(beam.combiners.CountCombineFn()).without_defaults())
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
(p | GenerateEvent.sample_data()
| CountEventsWithAccumulating()
| LogElements(with_window=True))
'''
Answer:
1, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
2, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
3, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
4, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
5, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
6, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
7, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
8, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
9, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
10, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
11, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
12, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
13, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
14, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
15, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
16, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
17, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
18, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
19, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
20, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
20, window(start=2021-03-01T00:00:00Z, end=2021-03-02T00:00:00Z)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment