Skip to content

Instantly share code, notes, and snippets.

@orian
Created May 10, 2018 12:30
Show Gist options
  • Save orian/5633f2e06ef67b459ede287dbf75f12f to your computer and use it in GitHub Desktop.
Save orian/5633f2e06ef67b459ede287dbf75f12f to your computer and use it in GitHub Desktop.
Experimenting with TestStream in Apache Beam test framework
package eu.datainq.playground;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
public class StarterPipelineTest {
private static final Duration WINDOW_DUR = Duration.standardMinutes(5);
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
Instant baseTime = new Instant(0);
public TimestampedValue<Integer> num(int v, Duration d) {
return TimestampedValue.of(v, baseTime.plus(d));
}
@Test
@Category(NeedsRunner.class)
public void simpleSum() {
TestStream<Integer> str = TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(baseTime)
.addElements(num(1, Duration.ZERO))
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(1000))
.advanceWatermarkToInfinity();
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR);
PCollection<KV<Integer, Integer>> got = pipeline
.apply(str)
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR)))
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers()))
.apply(Sum.integersPerKey());
PAssert.that(got)
.inOnTimePane(window)
.containsInAnyOrder(KV.of(1, 1));
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void simpleSum2() {
TestStream<Integer> str = TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(baseTime)
.addElements(num(1, Duration.ZERO), num(3, Duration.standardSeconds(10)))
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR);
PCollection<KV<Integer, Integer>> got = pipeline
.apply(str)
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR)))
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers()))
.apply(Sum.integersPerKey());
PAssert.that(got)
.inOnTimePane(window)
.containsInAnyOrder(KV.of(1, 4));
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void simpleSum3() {
TestStream<Integer> str = TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(baseTime)
.addElements(num(1, Duration.ZERO), num(3, Duration.standardSeconds(10)))
.advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(1)))
.addElements(num(5, Duration.standardSeconds(30)))
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR);
PCollection<KV<Integer, Integer>> got = pipeline
.apply(str)
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR)))
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers()))
.apply(Sum.integersPerKey());
PAssert.that(got)
.inWindow(window)
.containsInAnyOrder(KV.of(1, 9));
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void simpleSum4() {
TestStream<Integer> str = TestStream.create(VarIntCoder.of())
.advanceWatermarkTo(baseTime)
.addElements(num(1, Duration.ZERO), num(3, Duration.standardSeconds(10)))
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR))
.addElements(num(5, Duration.standardMinutes(3)))
.advanceWatermarkTo(baseTime.plus(WINDOW_DUR).plus(Duration.standardMinutes(1)))
.advanceWatermarkToInfinity();
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR);
PCollection<KV<Integer, Integer>> got = pipeline
.apply(str)
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2))))
.withAllowedLateness(Duration.standardMinutes(2))
.accumulatingFiredPanes())
.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers()))
.apply(Sum.integersPerKey());
PAssert.that(got)
.inOnTimePane(window)
.containsInAnyOrder(KV.of(1, 4));
PAssert.that(got)
.inFinalPane(window)
.containsInAnyOrder(KV.of(1, 9));
pipeline.run();
}
@Test
@Category(NeedsRunner.class)
public void batchMode() {
BoundedWindow window = new IntervalWindow(baseTime, WINDOW_DUR);
PCollection<Integer> x = pipeline
.apply(Create.timestamped(num(1, Duration.ZERO)))
.apply(Window.<Integer>into(FixedWindows.of(WINDOW_DUR))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(5)))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(10)))));
PCollection<KV<Integer, Integer>> got =
x.apply(WithKeys.<Integer, Integer>of((v) -> v % 2).withKeyType(TypeDescriptors.integers()))
.apply(Sum.integersPerKey());
PAssert.that(got).inOnTimePane(window).containsInAnyOrder(KV.of(1, 1));
pipeline.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment