Created
May 12, 2017 15:19
-
-
Save fpompermaier/2556892fe32a577c4e2f304cc883d2f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private static class CustomTrigger extends Trigger { | |
private final long maxCount; | |
private final long maxTime; | |
private final ReducingStateDescriptor stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); | |
private final ReducingStateDescriptor stateDesc2 = new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE); | |
private CustomTrigger(long maxTime, long maxCount) { | |
this.maxCount = maxCount; | |
this.maxTime = maxTime; | |
} | |
@Override | |
public TriggerResult onElement(Row element, long timestamp, GlobalWindow window, | |
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) | |
throws Exception { | |
ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc2); | |
timestamp = ctx.getCurrentProcessingTime(); | |
if (fireTimestamp.get() == null) { | |
long start = timestamp - (timestamp % maxTime); | |
long nextFireTimestamp = start + maxTime; | |
ctx.registerProcessingTimeTimer(nextFireTimestamp); | |
fireTimestamp.add(nextFireTimestamp); | |
return TriggerResult.CONTINUE; | |
} | |
ReducingState count = ctx.getPartitionedState(stateDesc); | |
count.add(1L); | |
if (count.get() >= maxCount) { | |
count.clear(); | |
return TriggerResult.FIRE_AND_PURGE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onProcessingTime(long time, GlobalWindow window, | |
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) | |
throws Exception { | |
ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc2); | |
if (fireTimestamp.get().equals) { | |
// fireTimestamp.clear(); f | |
ireTimestamp.add(time + maxTime); | |
ctx.registerProcessingTimeTimer(time + maxTime); | |
return TriggerResult.FIRE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onEventTime(long time, GlobalWindow window, | |
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) | |
throws Exception { | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public void clear(GlobalWindow window, | |
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) | |
throws Exception { | |
ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc2); | |
long timestamp = fireTimestamp.get(); | |
ctx.deleteProcessingTimeTimer(timestamp); | |
ctx.getPartitionedState(stateDesc).clear(); | |
fireTimestamp.clear(); | |
} | |
public static CustomTrigger of(Time maxTime, long maxCount) { | |
return new CustomTrigger(maxTime.toMilliseconds(), maxCount); | |
} | |
private static class Sum implements ReduceFunction { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Long reduce(Long value1, Long value2) throws Exception { | |
return value1 + value2; | |
} | |
} | |
private static class Min implements ReduceFunction { | |
private static final long serialVersionUID = 1L; | |
@Override | |
public Long reduce(Long value1, Long value2) throws Exception { | |
return Math.min(value1, value2); | |
} | |
} | |
} | |
}}} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment