Created
May 11, 2018 11:17
-
-
Save pnowojski/8cd650170925cf35be521cf236f1d97a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class LateDataJob { | |
public static void main(String[] args) throws Exception { | |
// set up the streaming execution environment | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
final OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late-data"){}; | |
SingleOutputStreamOperator<Integer> data = env.fromElements(1, 2, 1337, 42, 3, 4, 5) | |
.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Integer>) new TimestampExtractor(Time.milliseconds(10))) | |
.keyBy(new KeySelector<Integer, Integer>() { | |
@Override | |
public Integer getKey(Integer integer) throws Exception { | |
return 0; | |
} | |
}) | |
.timeWindow(Time.days(1)) | |
// .timeWindow(Time.milliseconds(10)) | |
.allowedLateness(Time.milliseconds(10)) | |
.sideOutputLateData(lateOutputTag) | |
.reduce(new ReduceFunction<Integer>() { | |
@Override | |
public Integer reduce(Integer a, Integer b) throws Exception { | |
return a + b; | |
} | |
}); | |
data.printToErr(); | |
data.getSideOutput(lateOutputTag).printToErr(); | |
// env.fromElements(42, 1337).printToErr(); | |
// execute program | |
env.execute("Flink Streaming Java API Skeleton"); | |
} | |
private static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Integer> implements AssignerWithPunctuatedWatermarks<Integer> { | |
public TimestampExtractor(Time maxOutOfOrderness) { | |
super(maxOutOfOrderness); | |
} | |
@Override | |
public long extractTimestamp(Integer integer) { | |
return integer; | |
} | |
@Nullable | |
@Override | |
public Watermark checkAndGetNextWatermark(Integer integer, long l) { | |
extractTimestamp(integer, l); | |
return getCurrentWatermark(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment