Created
November 7, 2017 17:13
-
-
Save pbartoszek/b9e7d96c75cff52076125ef47d3f69f9 to your computer and use it in GitHub Desktop.
Beam AfterProcessingTime trigger issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.pawel; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.StringUtf8Coder; | |
import org.apache.beam.sdk.io.FileBasedSink; | |
import org.apache.beam.sdk.io.TextIO; | |
import org.apache.beam.sdk.io.fs.ResolveOptions; | |
import org.apache.beam.sdk.io.fs.ResourceId; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.testing.PAssert; | |
import org.apache.beam.sdk.testing.TestPipeline; | |
import org.apache.beam.sdk.testing.TestStream; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.transforms.windowing.*; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TimestampedValue; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.joda.time.format.DateTimeFormatter; | |
import org.joda.time.format.ISODateTimeFormat; | |
import org.junit.Test; | |
import javax.annotation.Nullable; | |
import java.io.Serializable; | |
import java.util.HashMap; | |
import java.util.Map; | |
import static org.joda.time.Duration.standardSeconds; | |
public class LateEventTest implements Serializable { | |
@Test | |
public void testLateDataTriggering() throws Exception { | |
TestStream<String> stream = TestStream.<String>create(StringUtf8Coder.of()) | |
.advanceWatermarkTo(Instant.now()) | |
.addElements(TimestampedValue.of("A",new Instant("2017-11-06T12:10:03+0000"))) | |
.addElements(TimestampedValue.of("A",new Instant("2017-11-06T12:10:03+0000"))) | |
.advanceProcessingTime(Duration.standardSeconds(10)) | |
.addElements(TimestampedValue.of("B",new Instant("2017-11-06T12:10:03+0000"))) | |
.addElements(TimestampedValue.of("B",new Instant("2017-11-06T12:10:03+0000"))) | |
.advanceProcessingTime(Duration.standardSeconds(10)) | |
.addElements(TimestampedValue.of("C",new Instant("2017-11-06T12:10:03+0000"))) | |
.addElements(TimestampedValue.of("C",new Instant("2017-11-06T12:10:03+0000"))) | |
.advanceWatermarkToInfinity(); | |
PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); | |
Pipeline pipeline = TestPipeline.create(pipelineOptions); | |
PCollection<String> dataCollection = pipeline.apply(stream) | |
.apply(Window.<String>into(FixedWindows.of(standardSeconds(60))) | |
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)))) | |
.withAllowedLateness(Duration.standardDays(1000), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) | |
.accumulatingFiredPanes()) | |
.apply(Count.perElement()) | |
.apply(Window.<KV<String,Long>>configure().discardingFiredPanes()) | |
.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { | |
@ProcessElement | |
public void process(ProcessContext context) { | |
context.output(context.element().toString()); | |
} | |
})); | |
dataCollection | |
.apply(TextIO.write() | |
.to("test") | |
.withWindowedWrites() | |
.withFilenamePolicy(new TestFilenamePolicy()) | |
.withNumShards(1)); | |
PAssert.that(dataCollection).inWindow( | |
new IntervalWindow( | |
new Instant("2017-11-06T12:10:00+0000"), | |
new Instant("2017-11-06T12:10:00+0000").plus(Duration.standardSeconds(60)))) | |
.containsInAnyOrder("KV{A, 2}", "KV{B, 2}", "KV{C, 2}"); | |
//I wasn't able to write assertion for this using PAssert but the expected results should be: | |
/* | |
EXPECTED: | |
./12:10:00-12:11:00-pane-0-late-first | |
KV{A, 2} | |
./12:10:00-12:11:00-pane-1-late | |
KV{B, 2} | |
./12:10:00-12:11:00-pane-2-late-last | |
KV{C, 2} | |
ACTUAL RESULTS PRODUCED BY THIS TEST: | |
when using BEAM 2.0.0 | |
./12:10:00-12:11:00-pane-0-late-first-last | |
KV{A, 2} | |
KV{B, 2} | |
KV{C, 2} | |
when using BEAM 2.1.0 | |
./12:10:00-12:11:00-pane-0-late-first | |
KV{A, 2} | |
./12:10:00-12:11:00-pane-1-late | |
KV{B, 2} | |
KV{C, 2} | |
*/ | |
pipeline.run().waitUntilFinish(); | |
} | |
private static class CustomCombiner extends Combine.CombineFn<String, Map<String, Integer>, Map<String, Integer>> { | |
@Override | |
public Map<String, Integer> createAccumulator() { | |
return new HashMap<>(); | |
} | |
@Override | |
public Map<String, Integer> addInput(Map<String, Integer> accumulator, String input) { | |
Integer count = accumulator.getOrDefault(input, 0); | |
accumulator.put(input, count + 1); | |
return accumulator; | |
} | |
@Override | |
public Map<String, Integer> mergeAccumulators(Iterable<Map<String, Integer>> accumulators) { | |
Map<String, Integer> finalAccumulator = new HashMap<>(); | |
for (Map<String, Integer> accumulator : accumulators) { | |
accumulator.keySet().forEach(key -> { | |
finalAccumulator.merge(key, accumulator.get(key), (finalValue, accValue) -> finalValue + accValue); | |
}); | |
} | |
return finalAccumulator; | |
} | |
@Override | |
public Map<String, Integer> extractOutput(Map<String, Integer> accumulator) { | |
System.out.println("EXTRACT OUTPUT : " + accumulator); | |
return accumulator; | |
} | |
} | |
public class TestFilenamePolicy extends FileBasedSink.FilenamePolicy { | |
@Override | |
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext windowedContext, String extension) { | |
BoundedWindow window = windowedContext.getWindow(); | |
String windowStr = new BoundedWindowToString().apply(window); | |
String paneStr = new PaneInfoToString().apply(windowedContext.getPaneInfo()); | |
String path = String.format("out/%s-%s", windowStr, paneStr); | |
return outputDirectory.resolve(path, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); | |
} | |
@Nullable | |
@Override | |
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) { | |
return null; | |
} | |
} | |
static class BoundedWindowToString implements SerializableFunction<BoundedWindow, String> { | |
private static final DateTimeFormatter WINDOW_FORMATTER = ISODateTimeFormat.hourMinuteSecond(); | |
private static final String GLOBAL_WINDOW = "GlobalWindow"; | |
private static final String WINDOW_TEMPLATE = "%s-%s"; | |
@Override | |
public String apply(BoundedWindow boundedWindow) { | |
if (boundedWindow instanceof GlobalWindow) { | |
return GLOBAL_WINDOW; | |
} | |
IntervalWindow iw = (IntervalWindow) boundedWindow; | |
return String.format(WINDOW_TEMPLATE, WINDOW_FORMATTER.print(iw.start()), WINDOW_FORMATTER.print(iw.end())); | |
} | |
} | |
public class PaneInfoToString implements SerializableFunction<PaneInfo, String> { | |
@Override | |
public String apply(PaneInfo paneInfo) { | |
String paneString = createPaneString(paneInfo); | |
paneString = appendTimingInfo(paneString, paneInfo); | |
paneString = appendPaneFirstLastStatus(paneString, paneInfo); | |
return paneString; | |
} | |
private String createPaneString(PaneInfo paneInfo) { | |
return String.format("pane-%d", paneInfo.getIndex()); | |
} | |
private String appendPaneFirstLastStatus(String paneString, PaneInfo paneInfo) { | |
if (paneInfo.isFirst()) { | |
paneString = String.format("%s-first", paneString); | |
} | |
if (paneInfo.isLast()) { | |
paneString = String.format("%s-last", paneString); | |
} | |
return paneString; | |
} | |
private String appendTimingInfo(String paneString, PaneInfo paneInfo) { | |
return append(paneString, paneInfo.getTiming()); | |
} | |
private String append(String paneString, PaneInfo.Timing timing) { | |
return String.format("%s-%s", paneString, timing.toString().toLowerCase()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment