Skip to content

Instantly share code, notes, and snippets.

@davideanastasia
Last active September 5, 2019 06:23
Show Gist options
  • Save davideanastasia/0bf8e546110332ca78e59e6497bab1c0 to your computer and use it in GitHub Desktop.
Save davideanastasia/0bf8e546110332ca78e59e6497bab1c0 to your computer and use it in GitHub Desktop.
spatial_beam_04.java
@Test
public void testSpaceAndTimeSplit() throws Exception {
TestStream<GenericRecord> input = TestStream.create(GenericRecord.GenericRecordCoder.of())
.addElements(
TimestampedValue.of(new GenericRecord(1, Instant.ofEpochSecond(0), new Point(1.f, 1.f)), Instant.ofEpochSecond(0)),
TimestampedValue.of(new GenericRecord(1, Instant.ofEpochSecond(10), new Point(2.000001f, 2.000001f)), Instant.ofEpochSecond(10)),
TimestampedValue.of(new GenericRecord(1, Instant.ofEpochSecond(20), new Point(1.000002f, 1.000002f)), Instant.ofEpochSecond(20))
)
.advanceWatermarkToInfinity();
PCollection<KV<Integer, GenericRecord>> kvInput = pipeline.apply(input)
.setCoder(GenericRecord.GenericRecordCoder.of())
.apply(WithKeys.of(GenericRecord::getId))
.setCoder(KvCoder.of(BigEndianIntegerCoder.of(), GenericRecord.GenericRecordCoder.of()));
PCollection<KV<Integer, Iterable<GenericRecord>>> output = kvInput
.apply(Window.<KV<Integer, GenericRecord>>into(TiledSessions.<GenericRecord>withGapDuration(Duration.standardSeconds(30)).withStrategy(TiledSessions.Strategy.SPACE_AND_TIME).withResolution(9))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply(GroupByKey.create())
.apply(ParDo.of(new NoOpFn<>()));
TiledIntervalWindow window1 = new TiledIntervalWindow("897541ad5a7ffff", Instant.ofEpochSecond(0), Instant.ofEpochSecond(50));
PAssert
.that(output)
.inWindow(window1)
.containsInAnyOrder(KV.of(1, Lists.newArrayList(
new GenericRecord(1, Instant.ofEpochSecond(0), new Point(1.f, 1.f)),
new GenericRecord(1, Instant.ofEpochSecond(20), new Point(1.000002f, 1.000002f))
)));
TiledIntervalWindow window2 = new TiledIntervalWindow("89756e45da7ffff", Instant.ofEpochSecond(10), Instant.ofEpochSecond(40));
PAssert
.that(output)
.inWindow(window2)
.containsInAnyOrder(KV.of(1, Lists.newArrayList(
new GenericRecord(1, Instant.ofEpochSecond(10), new Point(2.000001f, 2.000001f))
)));
// run pipeline!
pipeline.run().waitUntilFinish();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment