Skip to content

Instantly share code, notes, and snippets.

@krolen
Created April 23, 2016 14:55
Show Gist options
  • Save krolen/ed1344e4d7be5b2116061685268651f5 to your computer and use it in GitHub Desktop.
Save krolen/ed1344e4d7be5b2116061685268651f5 to your computer and use it in GitHub Desktop.
Flink - counting trigger that correctly flushes all windows
public class FinishingCountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ValueStateDescriptor<Long> stateDesc = new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
public FinishingCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
ValueState<Long> count = ctx.getPartitionedState(stateDesc);
long currentCount = count.value() + 1;
if(currentCount == 1) {
ctx.registerEventTimeTimer(Long.MAX_VALUE - 1);
}
count.update(currentCount);
if (currentCount >= maxCount) {
count.update(0L);
ctx.deleteEventTimeTimer(Long.MAX_VALUE - 1);
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
if(time == Long.MAX_VALUE - 1) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public String toString() {
return "FinishingCountTrigger(" + maxCount + ")";
}
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<Long, String>> source = env.addSource(new SourceFunction<Tuple2<Long, String>>() {
@Override
public void run(SourceContext<Tuple2<Long, String>> ctx) throws Exception {
LongStream.range(0, 43).forEach(l -> {
ctx.collect(Tuple2.of(0L, "This is " + l));
});
}
@Override
public void cancel() {}
});
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
source
.keyBy(0)
.window(GlobalWindows.create())
.trigger(new FinishingCountTrigger<>(5))
.apply(new WindowFunction<Tuple2<Long, String>, String, Tuple, GlobalWindow>() {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<Long, String>> input, Collector<String> out) throws Exception {
System.out.println("!!!!!!!!! " + Thread.currentThread().getId() + ": " + Joiner.on(",").join(input));
out.collect(input.iterator().next().f1);
}
})
.print();
env.execute("yoyoyo");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment