Skip to content

Instantly share code, notes, and snippets.

@eliaslevy
Created August 31, 2017 00:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliaslevy/43eca44e92fdef44e6717c60ea46e4d2 to your computer and use it in GitHub Desktop.
Save eliaslevy/43eca44e92fdef44e6717c60ea46e4d2 to your computer and use it in GitHub Desktop.
EarlyFiringEventTimeTrigger
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.typeinfo.TypeHint
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult.{CONTINUE, FIRE, FIRE_AND_PURGE, PURGE}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
/**
* A Trigger that fires early at a regular interval without purging if there have
* been new events added to the pane, and once purging when the watermark passes
* the end of the window to which a pane belongs.
*/
class EarlyFiringEventTimeTrigger(firingInterval: Long) extends Trigger[Object, TimeWindow] {
// in milliseconds
private val interval = firingInterval
private val firstStateDesc: ValueStateDescriptor[Boolean] =
new ValueStateDescriptor("first", new TypeHint[Boolean](){}.getTypeInfo(), true)
private val moreStateDesc: ValueStateDescriptor[Boolean] =
new ValueStateDescriptor("more", new TypeHint[Boolean](){}.getTypeInfo(), false)
def onElement(element: Object, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
val first: ValueState[Boolean] = ctx.getPartitionedState(firstStateDesc)
val more: ValueState[Boolean] = ctx.getPartitionedState(moreStateDesc )
if (first.value) {
val nextEarlyFire = (timestamp - (timestamp % interval)) + interval
val nextFire = math.min(nextEarlyFire, window.maxTimestamp)
ctx.registerEventTimeTimer(nextFire)
first.update(false)
}
if (!more.value) more.update(true)
CONTINUE
}
def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
val more: ValueState[Boolean] = ctx.getPartitionedState(moreStateDesc)
if (time >= window.maxTimestamp) {
if (more.value) {
more.update(false)
FIRE_AND_PURGE
} else {
PURGE
}
} else {
ctx.registerEventTimeTimer(time + interval)
if (more.value) {
more.update(false)
FIRE
} else {
CONTINUE
}
}
}
def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
CONTINUE
}
override def clear(window: TimeWindow, ctx: TriggerContext): Unit = {
ctx.getPartitionedState(firstStateDesc).clear()
ctx.getPartitionedState(moreStateDesc ).clear()
ctx.deleteEventTimeTimer(window.maxTimestamp)
}
override def toString = "EarlyFiringEventTimeTrigger()"
}
object EarlyFiringEventTimeTrigger {
def of(interval: Time) = new EarlyFiringEventTimeTrigger(interval.toMilliseconds)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment