Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save fpompermaier/2556892fe32a577c4e2f304cc883d2f9 to your computer and use it in GitHub Desktop.
Save fpompermaier/2556892fe32a577c4e2f304cc883d2f9 to your computer and use it in GitHub Desktop.
private static class CustomTrigger extends Trigger {
private final long maxCount;
private final long maxTime;
private final ReducingStateDescriptor stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private final ReducingStateDescriptor stateDesc2 = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
private CustomTrigger(long maxTime, long maxCount) {
this.maxCount = maxCount;
this.maxTime = maxTime;
}
@Override
public TriggerResult onElement(Row element, long timestamp, GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
throws Exception {
ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc2);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % maxTime);
long nextFireTimestamp = start + maxTime;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
ReducingState count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
throws Exception {
ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc2);
if (fireTimestamp.get().equals) {
// fireTimestamp.clear(); f
ireTimestamp.add(time + maxTime);
ctx.registerProcessingTimeTimer(time + maxTime);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx)
throws Exception {
ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc2);
long timestamp = fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
ctx.getPartitionedState(stateDesc).clear();
fireTimestamp.clear();
}
public static CustomTrigger of(Time maxTime, long maxCount) {
return new CustomTrigger(maxTime.toMilliseconds(), maxCount);
}
private static class Sum implements ReduceFunction {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
private static class Min implements ReduceFunction {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
}}}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment