Skip to content

Instantly share code, notes, and snippets.

@pbartoszek
Created November 7, 2017 17:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pbartoszek/b9e7d96c75cff52076125ef47d3f69f9 to your computer and use it in GitHub Desktop.
Save pbartoszek/b9e7d96c75cff52076125ef47d3f69f9 to your computer and use it in GitHub Desktop.
Beam AfterProcessingTime trigger issue
package com.pawel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import static org.joda.time.Duration.standardSeconds;
public class LateEventTest implements Serializable {
@Test
public void testLateDataTriggering() throws Exception {
TestStream<String> stream = TestStream.<String>create(StringUtf8Coder.of())
.advanceWatermarkTo(Instant.now())
.addElements(TimestampedValue.of("A",new Instant("2017-11-06T12:10:03+0000")))
.addElements(TimestampedValue.of("A",new Instant("2017-11-06T12:10:03+0000")))
.advanceProcessingTime(Duration.standardSeconds(10))
.addElements(TimestampedValue.of("B",new Instant("2017-11-06T12:10:03+0000")))
.addElements(TimestampedValue.of("B",new Instant("2017-11-06T12:10:03+0000")))
.advanceProcessingTime(Duration.standardSeconds(10))
.addElements(TimestampedValue.of("C",new Instant("2017-11-06T12:10:03+0000")))
.addElements(TimestampedValue.of("C",new Instant("2017-11-06T12:10:03+0000")))
.advanceWatermarkToInfinity();
PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
Pipeline pipeline = TestPipeline.create(pipelineOptions);
PCollection<String> dataCollection = pipeline.apply(stream)
.apply(Window.<String>into(FixedWindows.of(standardSeconds(60)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1))))
.withAllowedLateness(Duration.standardDays(1000), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.accumulatingFiredPanes())
.apply(Count.perElement())
.apply(Window.<KV<String,Long>>configure().discardingFiredPanes())
.apply(ParDo.of(new DoFn<KV<String, Long>, String>() {
@ProcessElement
public void process(ProcessContext context) {
context.output(context.element().toString());
}
}));
dataCollection
.apply(TextIO.write()
.to("test")
.withWindowedWrites()
.withFilenamePolicy(new TestFilenamePolicy())
.withNumShards(1));
PAssert.that(dataCollection).inWindow(
new IntervalWindow(
new Instant("2017-11-06T12:10:00+0000"),
new Instant("2017-11-06T12:10:00+0000").plus(Duration.standardSeconds(60))))
.containsInAnyOrder("KV{A, 2}", "KV{B, 2}", "KV{C, 2}");
//I wasn't able to write assertion for this using PAssert but the expected results should be:
/*
EXPECTED:
./12:10:00-12:11:00-pane-0-late-first
KV{A, 2}
./12:10:00-12:11:00-pane-1-late
KV{B, 2}
./12:10:00-12:11:00-pane-2-late-last
KV{C, 2}
ACTUAL RESULTS PRODUCED BY THIS TEST:
when using BEAM 2.0.0
./12:10:00-12:11:00-pane-0-late-first-last
KV{A, 2}
KV{B, 2}
KV{C, 2}
when using BEAM 2.1.0
./12:10:00-12:11:00-pane-0-late-first
KV{A, 2}
./12:10:00-12:11:00-pane-1-late
KV{B, 2}
KV{C, 2}
*/
pipeline.run().waitUntilFinish();
}
private static class CustomCombiner extends Combine.CombineFn<String, Map<String, Integer>, Map<String, Integer>> {
@Override
public Map<String, Integer> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<String, Integer> addInput(Map<String, Integer> accumulator, String input) {
Integer count = accumulator.getOrDefault(input, 0);
accumulator.put(input, count + 1);
return accumulator;
}
@Override
public Map<String, Integer> mergeAccumulators(Iterable<Map<String, Integer>> accumulators) {
Map<String, Integer> finalAccumulator = new HashMap<>();
for (Map<String, Integer> accumulator : accumulators) {
accumulator.keySet().forEach(key -> {
finalAccumulator.merge(key, accumulator.get(key), (finalValue, accValue) -> finalValue + accValue);
});
}
return finalAccumulator;
}
@Override
public Map<String, Integer> extractOutput(Map<String, Integer> accumulator) {
System.out.println("EXTRACT OUTPUT : " + accumulator);
return accumulator;
}
}
public class TestFilenamePolicy extends FileBasedSink.FilenamePolicy {
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext windowedContext, String extension) {
BoundedWindow window = windowedContext.getWindow();
String windowStr = new BoundedWindowToString().apply(window);
String paneStr = new PaneInfoToString().apply(windowedContext.getPaneInfo());
String path = String.format("out/%s-%s", windowStr, paneStr);
return outputDirectory.resolve(path, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Nullable
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
return null;
}
}
static class BoundedWindowToString implements SerializableFunction<BoundedWindow, String> {
private static final DateTimeFormatter WINDOW_FORMATTER = ISODateTimeFormat.hourMinuteSecond();
private static final String GLOBAL_WINDOW = "GlobalWindow";
private static final String WINDOW_TEMPLATE = "%s-%s";
@Override
public String apply(BoundedWindow boundedWindow) {
if (boundedWindow instanceof GlobalWindow) {
return GLOBAL_WINDOW;
}
IntervalWindow iw = (IntervalWindow) boundedWindow;
return String.format(WINDOW_TEMPLATE, WINDOW_FORMATTER.print(iw.start()), WINDOW_FORMATTER.print(iw.end()));
}
}
public class PaneInfoToString implements SerializableFunction<PaneInfo, String> {
@Override
public String apply(PaneInfo paneInfo) {
String paneString = createPaneString(paneInfo);
paneString = appendTimingInfo(paneString, paneInfo);
paneString = appendPaneFirstLastStatus(paneString, paneInfo);
return paneString;
}
private String createPaneString(PaneInfo paneInfo) {
return String.format("pane-%d", paneInfo.getIndex());
}
private String appendPaneFirstLastStatus(String paneString, PaneInfo paneInfo) {
if (paneInfo.isFirst()) {
paneString = String.format("%s-first", paneString);
}
if (paneInfo.isLast()) {
paneString = String.format("%s-last", paneString);
}
return paneString;
}
private String appendTimingInfo(String paneString, PaneInfo paneInfo) {
return append(paneString, paneInfo.getTiming());
}
private String append(String paneString, PaneInfo.Timing timing) {
return String.format("%s-%s", paneString, timing.toString().toLowerCase());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment