Skip to content

Instantly share code, notes, and snippets.

@jaketf
Created April 29, 2020 00:16
Show Gist options
  • Save jaketf/d3c2e70dde781bbb0ef1993446e34b71 to your computer and use it in GitHub Desktop.
Save jaketf/d3c2e70dde781bbb0ef1993446e34b71 to your computer and use it in GitHub Desktop.
[BEAM-9847] Single Elements Outputs CANNOT be eagerly output via triggering
/** Utility DoFn for purpose of this test (defined in a test util class)
public static class EmitNSlowlyFn extends DoFn<Integer, Integer> {
@ProcessElement
public void slowlyEmit(ProcessContext context) throws InterruptedException {
Integer num = context.element();
int i = 0;
while (i < num) { // output elements at a rate of ~ 2 elements per secon
Sleeper.DEFAULT.sleep(500);
context.output(i++);
}
} */
@RunWith(JUnit4.class)
public class EarlyFiringsTest {
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Test
public void test_early_results() {
PCollection<Long> out = pipeline
.apply("seed with a single input element", Create.of(Collections.singletonList(10)))
.apply("slowly emit elements", ParDo.of(new EmitNSlowlyFn()))
// [Start] logic copied from HL7v2IO.ListHL7v2Messages for early event emission.
.apply("emit early global windowing",
Window.<Integer>into(new GlobalWindows())
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1))))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(Reshuffle.viaRandomKey())
// [End] logic copied from HL7v2IO.ListHL7v2Messages for early event emission.
.apply(WithTimestamps.of((Integer i) -> Instant.now()))
.apply("processing time windowing", Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(1))))
.apply("Count per processing time window", Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
PAssert.that(out).satisfies((Iterable<Long> it) -> {
for (Long count : it){
Assert.assertFalse(String.format("each window should contain at most 2 elements if emitting early got %s", count), count > 2);
}
return null;
});
pipeline.run().waitUntilFinish();
}
}
/** RESULTS CONFIRM THAT A SINGLE ELEMENT'S OUTPUTS CANNOT BE OUTPUT EAGERLY WITH TRIGGERING
* Count per processing time window/Values/Values/Map/ParMultiDo(Anonymous).output: each window should contain at most 2 elements if emitting early got 10
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment