Created
April 29, 2020 00:16
-
-
Save jaketf/d3c2e70dde781bbb0ef1993446e34b71 to your computer and use it in GitHub Desktop.
[BEAM-9847] Single Elements Outputs CANNOT be eagerly output via triggering
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Utility DoFn for purpose of this test (defined in a test util class) | |
public static class EmitNSlowlyFn extends DoFn<Integer, Integer> { | |
@ProcessElement | |
public void slowlyEmit(ProcessContext context) throws InterruptedException { | |
Integer num = context.element(); | |
int i = 0; | |
while (i < num) { // output elements at a rate of ~ 2 elements per secon | |
Sleeper.DEFAULT.sleep(500); | |
context.output(i++); | |
} | |
} */ | |
@RunWith(JUnit4.class) | |
public class EarlyFiringsTest { | |
@Rule public final transient TestPipeline pipeline = TestPipeline.create(); | |
@Test | |
public void test_early_results() { | |
PCollection<Long> out = pipeline | |
.apply("seed with a single input element", Create.of(Collections.singletonList(10))) | |
.apply("slowly emit elements", ParDo.of(new EmitNSlowlyFn())) | |
// [Start] logic copied from HL7v2IO.ListHL7v2Messages for early event emission. | |
.apply("emit early global windowing", | |
Window.<Integer>into(new GlobalWindows()) | |
.triggering( | |
AfterWatermark.pastEndOfWindow() | |
.withEarlyFirings( | |
AfterProcessingTime.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardSeconds(1)))) | |
.discardingFiredPanes() | |
.withAllowedLateness(Duration.ZERO)) | |
.apply(Reshuffle.viaRandomKey()) | |
// [End] logic copied from HL7v2IO.ListHL7v2Messages for early event emission. | |
.apply(WithTimestamps.of((Integer i) -> Instant.now())) | |
.apply("processing time windowing", Window.<Integer>into(FixedWindows.of(Duration.standardSeconds(1)))) | |
.apply("Count per processing time window", Combine.globally(Count.<Integer>combineFn()).withoutDefaults()); | |
PAssert.that(out).satisfies((Iterable<Long> it) -> { | |
for (Long count : it){ | |
Assert.assertFalse(String.format("each window should contain at most 2 elements if emitting early got %s", count), count > 2); | |
} | |
return null; | |
}); | |
pipeline.run().waitUntilFinish(); | |
} | |
} | |
/** RESULTS CONFIRM THAT A SINGLE ELEMENT'S OUTPUTS CANNOT BE OUTPUT EAGERLY WITH TRIGGERING | |
* Count per processing time window/Values/Values/Map/ParMultiDo(Anonymous).output: each window should contain at most 2 elements if emitting early got 10 | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment