Skip to content

Instantly share code, notes, and snippets.

Created December 10, 2017 20:29
ApacheBeamAggregations.java
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class TriggersTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
private Instant t0 = Instant.parse("2017-01-01T00:00:00Z");
private Instant t1 = t0.plus(Duration.standardMinutes(1));
private Instant t2 = t0.plus(Duration.standardMinutes(2));
private Instant t3 = t0.plus(Duration.standardMinutes(3));
private Instant t4 = t0.plus(Duration.standardMinutes(4));
private Instant t10 = t0.plus(Duration.standardMinutes(10));
@Test
public void testSumPerKey() {
Duration ONE_MINUTE = Duration.standardMinutes(1);
Duration TWO_MINUTES = Duration.standardMinutes(2);
Duration TEN_MINUTES = Duration.standardMinutes(10);
Duration ZERO = Duration.ZERO;
TestStream<KV<String, Integer>> createInput =
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))
.advanceWatermarkTo(t0)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES)
.advanceWatermarkTo(t1)
.addElements(KV.of("a", 1))
.advanceProcessingTime(TWO_MINUTES)
.advanceWatermarkToInfinity();
PCollection<KV<String, Integer>> input = pipeline.apply(createInput);
Trigger trigger =
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE))
// final result past watermark
.orFinally(AfterWatermark.pastEndOfWindow());
PCollection<KV<String, Integer>> oneMinSum =
input
.apply(
"OneMinWindows",
Window.<KV<String, Integer>>into(FixedWindows.of(ONE_MINUTE))
.triggering(trigger)
.accumulatingFiredPanes()
.withAllowedLateness(ZERO))
.apply("OneMinSum", Sum.integersPerKey());
PCollection<KV<String, Integer>> onHourSums =
oneMinSum
.apply(
"SlidingWindows",
Window.<KV<String, Integer>>into(FixedWindows.of(TEN_MINUTES))
.triggering(trigger)
.accumulatingFiredPanes()
.withAllowedLateness(ZERO))
.apply("OneHourSum", Sum.integersPerKey());
// printAll("OneMin", oneMinSum);
printAll("OneHourSum", onHourSums);
pipeline.run();
}
public static <K, V> void printAll(String name, PCollection<KV<K, V>> input) {
PCollection<ValueInSingleWindow<KV<K, V>>> reified =
input.apply("Reify" + name, reifyWindows());
PAssert.that(reified)
.satisfies(
it -> {
System.out.println("------------------------------------------");
it.forEach(
vw -> {
K key = vw.getValue().getKey();
V sum = vw.getValue().getValue();
System.out.println("key=" + key + " value=" + sum);
System.out.println(" window=" + vw.getWindow());
System.out.println(" pane=" + vw.getPane());
// System.out.println(" timestamp=" + vw.getTimestamp());
});
return null;
});
}
public static <T> PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> reifyWindows() {
return new ReifyWindow<>();
}
private static class ReifyWindow<T>
extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> {
@Override
public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) {
return input
.apply(
ParDo.of(
new DoFn<T, ValueInSingleWindow<T>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.outputWithTimestamp(
ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()),
c.timestamp());
}
}))
.setCoder(
ValueInSingleWindow.Coder.of(
input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment