Skip to content

Instantly share code, notes, and snippets.

@marquesds
Created March 24, 2020 19:14
Show Gist options
  • Save marquesds/5b59e148e27a58f099d0a29262e5877c to your computer and use it in GitHub Desktop.
Save marquesds/5b59e148e27a58f099d0a29262e5877c to your computer and use it in GitHub Desktop.
Flink count window with timeout
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.triggers.{TriggerResult, _}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
case class FlinkCountWindowWithTimeout[W <: TimeWindow](maxCount: Long, timeCharacteristic: TimeCharacteristic) extends Trigger[Object, W] {
private val serialVersionUID = 1L
import org.apache.flink.api.common.state.ReducingStateDescriptor
import org.apache.flink.api.common.typeutils.base.LongSerializer
private val stateDesc = new ReducingStateDescriptor[java.lang.Long]("count", new Sum(), LongSerializer.INSTANCE)
override def onElement(element: Object, timestamp: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = {
val count = ctx.getPartitionedState(stateDesc)
count.add(1L)
if (count.get >= maxCount || timestamp >= window.getEnd) {
count.clear()
TriggerResult.FIRE
} else {
TriggerResult.CONTINUE
}
}
override def onProcessingTime(time: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.EventTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
}
}
override def onEventTime(time: Long, window: W, ctx: Trigger.TriggerContext): TriggerResult = {
if (timeCharacteristic == TimeCharacteristic.ProcessingTime) {
TriggerResult.CONTINUE
} else {
if (time >= window.getEnd) TriggerResult.CONTINUE else TriggerResult.FIRE_AND_PURGE
}
}
override def clear(window: W, ctx: Trigger.TriggerContext): Unit = ctx.getPartitionedState(stateDesc).clear
}
@SerialVersionUID(1L) class Sum extends ReduceFunction[java.lang.Long] {
@throws[Exception] override def reduce(value1: java.lang.Long, value2: java.lang.Long): java.lang.Long = value1 + value2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment