Skip to content

Instantly share code, notes, and snippets.

@x
Created August 1, 2021 22:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save x/a8399b32900bcaf3b700bbb196b956e3 to your computer and use it in GitHub Desktop.
Save x/a8399b32900bcaf3b700bbb196b956e3 to your computer and use it in GitHub Desktop.
Apache Beam Summit - Windowing Example
package io.oden.laser.common.transforms;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.joda.time.Duration;
public class Windows {
/*
* The Window described attempts to both be prompt but not needlessly retrigger.
* It's designed to account for the following cases...
* <ul>
* <li> All data is coming on-time. The watermark at any given time is roughly the
* current time.
* <li> Data is being backfilled from some subset of metrics and the watermark is
* ahead of the event time of the windows for those metrics.
* <li> Data is being backfilled for some subset of metrics but the watermark has
* been stuck to be earlier than than event time for most metrics.
* <li> Any of cases 1, 2, or 3 but where late data has arrived due to some
* uncontrollable situation (i.e. a single metric for a pane gets stuck in
* pubsub for days and then is released).
* </ul>
*
* Additional Reading: -
* https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 -
* https://beam.apache.org/documentation/programming-guide/#windowing -
* https://issues.apache.org/jira/browse/BEAM-644
*/
public static <T> Window<T> earlyAndLateFireSlidingWindow(
Duration windowSize,
Duration windowSlide,
Duration earlyFire,
Duration allowedLateness,
Duration offset) {
return Window.<T>into(
SlidingWindows.of(windowSize)
.every(windowSlide)
// In sliding windows, with a configurable window size plus a
// buffer(default at 0) on the end to provide space for
// calculating the last deltasum value(rollups). We add a offset
// (default at 0),which moves the window forward
// [start+offset, end+offset) to align with Heroic's
// exclusive start and inclusive end.
// .withOffset(windowSize.minus(deltasumBuffer).plus(offset)))
.withOffset(offset))
// This sliding window will fire (materialize the accumulated
// data) at least once. Each time we do we'll fire with the
// accumulated data in the window so far (as opposed to just the
// new data since the last fire).
.accumulatingFiredPanes()
.triggering(
// The primary way that this window will fire is when the
// watermark (tracked upstream as the estimated minimum of the
// backlog) exceeds the end of the window. This is the only
// firing behavior for case 1 and the first firing behavior
// for cases 2 and 4.
AfterWatermark.pastEndOfWindow()
// In case 3, we don't want the user to have to wait until
// the watermark has caught up to get their data so we
// have a configurable threshold that will allow the
// window to fire early based on how much time has passed
// since the first element we saw in the pane.
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFire))
// In case 2, all elements are considered "late". And we
// don't want to excessively fire once for every element
// that gets added to the pane (i.e. 300 times for a 5
// minute window). So, instead, we only late fire when new
// elements enter and the window's time has passed in
// process time. The assumption here is that backfilling a
// pane is, typically, faster than on-time filling. This
// introduces a small, but acceptable, lag in case 4.
.withLateFirings(
AfterAll.of(
AfterPane.elementCountAtLeast(1),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowSize))))
// When accounting for case 3, after the watermarket has caught
// up, the default behavior would be to fire the window again.
// This changes that behavior to only fire if any new data has
// arrived between the early fire and the on-time fire.
.withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
// This sets the duration we will retain the panes and accept late
// data in event time.
.withAllowedLateness(allowedLateness);
}
public static <T> Window<T> earlyAndLateFireSlidingWindow(
Duration windowSize, Duration windowSlide, Duration earlyFire, Duration allowedLateness) {
return earlyAndLateFireSlidingWindow(
windowSize, windowSlide, earlyFire, allowedLateness, Duration.ZERO);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment