Skip to content

Instantly share code, notes, and snippets.

@shikhar
Last active November 30, 2021 10:16
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shikhar/2cb9f1b792be31b7c16e to your computer and use it in GitHub Desktop.
Save shikhar/2cb9f1b792be31b7c16e to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import java.io.IOException;
/**
* A {@link Trigger} that fires once the number of elements in a pane reaches the given count or the timeout expires, whichever happens first.
*
* @param <T> The type of elements.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public class CountWithTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final long timeoutMs;
private final ValueStateDescriptor<Long> countDesc = new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
private final ValueStateDescriptor<Long> deadlineDesc = new ValueStateDescriptor<>("deadline", LongSerializer.INSTANCE, Long.MAX_VALUE);
private CountWithTimeoutTrigger(long maxCount, long timeoutMs) {
this.maxCount = maxCount;
this.timeoutMs = timeoutMs;
}
@Override
public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws IOException {
final ValueState<Long> deadline = ctx.getPartitionedState(deadlineDesc);
final ValueState<Long> count = ctx.getPartitionedState(countDesc);
final long currentDeadline = deadline.value();
final long currentTimeMs = System.currentTimeMillis();
final long newCount = count.value() + 1;
if (currentTimeMs >= currentDeadline || newCount >= maxCount) {
return fire(deadline, count);
}
if (currentDeadline == deadlineDesc.getDefaultValue()) {
final long nextDeadline = currentTimeMs + timeoutMs;
deadline.update(nextDeadline);
ctx.registerProcessingTimeTimer(nextDeadline);
}
count.update(newCount);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
final ValueState<Long> deadline = ctx.getPartitionedState(deadlineDesc);
// fire only if the deadline hasn't changed since registering this timer
if (deadline.value() == time) {
return fire(deadline, ctx.getPartitionedState(countDesc));
}
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
final ValueState<Long> deadline = ctx.getPartitionedState(deadlineDesc);
final long deadlineValue = deadline.value();
if (deadlineValue != deadlineDesc.getDefaultValue()) {
ctx.deleteProcessingTimeTimer(deadlineValue);
}
deadline.clear();
}
private TriggerResult fire(ValueState<Long> deadline, ValueState<Long> count) throws IOException {
deadline.update(Long.MAX_VALUE);
count.update(0L);
return TriggerResult.FIRE;
}
@Override
public String toString() {
return "CountWithTimeoutTrigger(" + maxCount + "," + timeoutMs + ")";
}
public static <T, W extends Window> CountWithTimeoutTrigger<T, W> of(long maxCount, long intervalMs) {
return new CountWithTimeoutTrigger<>(maxCount, intervalMs);
}
}
@shikhar
Copy link
Author

shikhar commented Feb 15, 2016

Enable batching using this trigger dataStream.batched(count, timeoutMs)

  implicit class Batching[T](stream: DataStream[T]) {

    def batched(count: Int, timeoutMs: Long): AllWindowedStream[T, _ <: Window] = {
      stream
        .windowAll(GlobalWindows.create().asInstanceOf[WindowAssigner[T, Window]])
        .trigger(PurgingTrigger.of(CountWithTimeoutTrigger.of(count, timeoutMs)))
    }

  }

@vprabs
Copy link

vprabs commented Jun 28, 2016

Dont you need to "synchronize" the "onElement" and "onProcessingTime" functions ?

@RaccoonDev
Copy link

Is there a clear call missing for Count State?

@shikhar
Copy link
Author

shikhar commented Oct 27, 2021

Sorry folks, I haven't used Flink in a loooong time and can't speak to the correctness of the code.

@RaccoonDev
Copy link

No worries! Though, maybe somebody who would find that would tweak it to own needs :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment