Skip to content

Instantly share code, notes, and snippets.

@pnowojski
Created May 11, 2018 11:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pnowojski/8cd650170925cf35be521cf236f1d97a to your computer and use it in GitHub Desktop.
Save pnowojski/8cd650170925cf35be521cf236f1d97a to your computer and use it in GitHub Desktop.
public class LateDataJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
final OutputTag<Integer> lateOutputTag = new OutputTag<Integer>("late-data"){};
SingleOutputStreamOperator<Integer> data = env.fromElements(1, 2, 1337, 42, 3, 4, 5)
.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks<Integer>) new TimestampExtractor(Time.milliseconds(10)))
.keyBy(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer integer) throws Exception {
return 0;
}
})
.timeWindow(Time.days(1))
// .timeWindow(Time.milliseconds(10))
.allowedLateness(Time.milliseconds(10))
.sideOutputLateData(lateOutputTag)
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer a, Integer b) throws Exception {
return a + b;
}
});
data.printToErr();
data.getSideOutput(lateOutputTag).printToErr();
// env.fromElements(42, 1337).printToErr();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
private static class TimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Integer> implements AssignerWithPunctuatedWatermarks<Integer> {
public TimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(Integer integer) {
return integer;
}
@Nullable
@Override
public Watermark checkAndGetNextWatermark(Integer integer, long l) {
extractTimestamp(integer, l);
return getCurrentWatermark();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment