Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
public class SessionTrigger implements Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
private final long sessionTimeout;
private final long maxElements;
public SessionTrigger(long sessionTimeout, long maxElements) {
this.sessionTimeout = sessionTimeout;
this.maxElements = maxElements;
}
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
OperatorState<Long> countState = ctx.state("count", 0L);
OperatorState<Long> timestampState = ctx.state("max-timestamp", 0L);
if (timestamp > timestampState.value()) {
timestampState.update(timestamp);
ctx.registerEventTimeTimer(timestamp + sessionTimeout);
}
countState.update(countState.value() + 1);
if (countState.value() > maxElements) {
// fire the trigger and keep the elements that we have so far
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TriggerContext ctx) {
// fire the trigger and delete all window contents
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, TriggerContext ctx) throws Exception {
// ignore processing time triggers, we never set them
return TriggerResult.CONTINUE;
}
@Override
public String toString() {
return "SessionTrigger()";
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment