/ApacheBeamAggregations.java Secret
Created
December 10, 2017 20:29
ApacheBeamAggregations.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.beam.sdk.coders.KvCoder; | |
import org.apache.beam.sdk.coders.StringUtf8Coder; | |
import org.apache.beam.sdk.coders.VarIntCoder; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
import org.apache.beam.sdk.testing.TestStream; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.Sum; | |
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; | |
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | |
import org.apache.beam.sdk.transforms.windowing.BoundedWindow; | |
import org.apache.beam.sdk.transforms.windowing.FixedWindows; | |
import org.apache.beam.sdk.transforms.windowing.Repeatedly; | |
import org.apache.beam.sdk.transforms.windowing.Trigger; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.ValueInSingleWindow; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.junit.runners.JUnit4; | |
@RunWith(JUnit4.class) | |
public class TriggersTest { | |
@Rule public final transient TestPipeline pipeline = TestPipeline.create(); | |
private Instant t0 = Instant.parse("2017-01-01T00:00:00Z"); | |
private Instant t1 = t0.plus(Duration.standardMinutes(1)); | |
private Instant t2 = t0.plus(Duration.standardMinutes(2)); | |
private Instant t3 = t0.plus(Duration.standardMinutes(3)); | |
private Instant t4 = t0.plus(Duration.standardMinutes(4)); | |
private Instant t10 = t0.plus(Duration.standardMinutes(10)); | |
@Test | |
public void testSumPerKey() { | |
Duration ONE_MINUTE = Duration.standardMinutes(1); | |
Duration TWO_MINUTES = Duration.standardMinutes(2); | |
Duration TEN_MINUTES = Duration.standardMinutes(10); | |
Duration ZERO = Duration.ZERO; | |
TestStream<KV<String, Integer>> createInput = | |
TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())) | |
.advanceWatermarkTo(t0) | |
.addElements(KV.of("a", 1)) | |
.advanceProcessingTime(TWO_MINUTES) | |
.addElements(KV.of("a", 1)) | |
.advanceProcessingTime(TWO_MINUTES) | |
.advanceWatermarkTo(t1) | |
.addElements(KV.of("a", 1)) | |
.advanceProcessingTime(TWO_MINUTES) | |
.advanceWatermarkToInfinity(); | |
PCollection<KV<String, Integer>> input = pipeline.apply(createInput); | |
Trigger trigger = | |
Repeatedly.forever( | |
AfterProcessingTime.pastFirstElementInPane() | |
// Speculative every ONE_MINUTE | |
.plusDelayOf(ONE_MINUTE)) | |
// final result past watermark | |
.orFinally(AfterWatermark.pastEndOfWindow()); | |
PCollection<KV<String, Integer>> oneMinSum = | |
input | |
.apply( | |
"OneMinWindows", | |
Window.<KV<String, Integer>>into(FixedWindows.of(ONE_MINUTE)) | |
.triggering(trigger) | |
.accumulatingFiredPanes() | |
.withAllowedLateness(ZERO)) | |
.apply("OneMinSum", Sum.integersPerKey()); | |
PCollection<KV<String, Integer>> onHourSums = | |
oneMinSum | |
.apply( | |
"SlidingWindows", | |
Window.<KV<String, Integer>>into(FixedWindows.of(TEN_MINUTES)) | |
.triggering(trigger) | |
.accumulatingFiredPanes() | |
.withAllowedLateness(ZERO)) | |
.apply("OneHourSum", Sum.integersPerKey()); | |
// printAll("OneMin", oneMinSum); | |
printAll("OneHourSum", onHourSums); | |
pipeline.run(); | |
} | |
public static <K, V> void printAll(String name, PCollection<KV<K, V>> input) { | |
PCollection<ValueInSingleWindow<KV<K, V>>> reified = | |
input.apply("Reify" + name, reifyWindows()); | |
PAssert.that(reified) | |
.satisfies( | |
it -> { | |
System.out.println("------------------------------------------"); | |
it.forEach( | |
vw -> { | |
K key = vw.getValue().getKey(); | |
V sum = vw.getValue().getValue(); | |
System.out.println("key=" + key + " value=" + sum); | |
System.out.println(" window=" + vw.getWindow()); | |
System.out.println(" pane=" + vw.getPane()); | |
// System.out.println(" timestamp=" + vw.getTimestamp()); | |
}); | |
return null; | |
}); | |
} | |
public static <T> PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> reifyWindows() { | |
return new ReifyWindow<>(); | |
} | |
private static class ReifyWindow<T> | |
extends PTransform<PCollection<T>, PCollection<ValueInSingleWindow<T>>> { | |
@Override | |
public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) { | |
return input | |
.apply( | |
ParDo.of( | |
new DoFn<T, ValueInSingleWindow<T>>() { | |
@ProcessElement | |
public void processElement(ProcessContext c, BoundedWindow window) { | |
c.outputWithTimestamp( | |
ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()), | |
c.timestamp()); | |
} | |
})) | |
.setCoder( | |
ValueInSingleWindow.Coder.of( | |
input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment