Created
June 6, 2016 23:41
-
-
Save eliaslevy/ec840444607b9a5dd5aa3eb2cdd77932 to your computer and use it in GitHub Desktop.
Modified EventTimeTriggerWithEarlyAndLateFiring with firing suppression if there aren't any new events between timers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dataartisans.beam_comparison.customTriggers; | |
package com.cisco.sbg.amp; | |
import org.apache.flink.api.common.state.ValueState; | |
import org.apache.flink.api.common.state.ValueStateDescriptor; | |
import org.apache.flink.api.common.typeutils.base.BooleanSerializer; | |
import org.apache.flink.api.common.typeutils.base.LongSerializer; | |
import org.apache.flink.hadoop.shaded.com.google.common.base.Preconditions; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.triggers.Trigger; | |
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import java.io.IOException; | |
public class EventTimeTriggerWithEarlyAndLateFiring extends Trigger<Object, TimeWindow> { | |
private static final long serialVersionUID = 1L; | |
private final boolean accumulating; | |
private final long allowedLateness; | |
private final long earlyFiringPeriod; | |
private final long lateFiringPeriod; | |
private final ValueStateDescriptor<Long> processingTimeTriggerDescriptor = | |
new ValueStateDescriptor<Long>("PROCESSING_TIME_TRIGGER", LongSerializer.INSTANCE, -1L); | |
private final ValueStateDescriptor<Boolean> isPastTheEndOfWindow = | |
new ValueStateDescriptor<Boolean>("AFTER_END_OF_WINDOW", BooleanSerializer.INSTANCE, false); | |
private final ValueStateDescriptor<Boolean> moreSinceFiredDescriptor = | |
new ValueStateDescriptor("MORE_SINCE_FIRED", BooleanSerializer.INSTANCE, false); | |
public static EventTimeTriggerWithEarlyAndLateFiring create() { | |
return new EventTimeTriggerWithEarlyAndLateFiring(false, 0L, 0L, 0L); | |
} | |
private EventTimeTriggerWithEarlyAndLateFiring(boolean accumulating, long allowedLateness, long earlyFiring, long lateFiring) { | |
Preconditions.checkArgument(earlyFiring >= 0); | |
Preconditions.checkArgument(lateFiring >= 0); | |
Preconditions.checkArgument(allowedLateness >= 0); | |
this.allowedLateness = allowedLateness; | |
this.earlyFiringPeriod = earlyFiring; | |
this.lateFiringPeriod = lateFiring; | |
this.accumulating = accumulating; | |
} | |
public EventTimeTriggerWithEarlyAndLateFiring withAllowedLateness(Time allowedLateness) { | |
return new EventTimeTriggerWithEarlyAndLateFiring(accumulating, allowedLateness.toMilliseconds(), earlyFiringPeriod, lateFiringPeriod); | |
} | |
public EventTimeTriggerWithEarlyAndLateFiring withEarlyFiringEvery(Time earlyFiringPeriod) { | |
return new EventTimeTriggerWithEarlyAndLateFiring(accumulating, allowedLateness, earlyFiringPeriod.toMilliseconds(), lateFiringPeriod); | |
} | |
public EventTimeTriggerWithEarlyAndLateFiring withLateFiringEvery(Time lateFiringPeriod) { | |
return new EventTimeTriggerWithEarlyAndLateFiring(accumulating, allowedLateness, earlyFiringPeriod, lateFiringPeriod.toMilliseconds()); | |
} | |
public EventTimeTriggerWithEarlyAndLateFiring accumulating() { | |
return new EventTimeTriggerWithEarlyAndLateFiring(true, allowedLateness, earlyFiringPeriod, lateFiringPeriod); | |
} | |
public EventTimeTriggerWithEarlyAndLateFiring discarding() { | |
return new EventTimeTriggerWithEarlyAndLateFiring(false, allowedLateness, earlyFiringPeriod, lateFiringPeriod); | |
} | |
@Override | |
public TriggerResult onElement(Object o, long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
if (timeWindow.maxTimestamp() + allowedLateness <= triggerContext.getCurrentWatermark()) { | |
return TriggerResult.PURGE; | |
} | |
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()); | |
ValueState<Long> processingTimeTrigger = triggerContext.getPartitionedState(processingTimeTriggerDescriptor); | |
if(processingTimeTrigger.value() == -1L) { | |
long nextProcessingTimeTriggerTimestamp = getNextProcessingTimeTriggerTimestamp(triggerContext); | |
processingTimeTrigger.update(nextProcessingTimeTriggerTimestamp); | |
triggerContext.registerProcessingTimeTimer(nextProcessingTimeTriggerTimestamp); | |
} | |
ValueState<Boolean> moreSinceFired = triggerContext.getPartitionedState(moreSinceFiredDescriptor); | |
if (!moreSinceFired.value()) { | |
moreSinceFired.update(true); | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public TriggerResult onProcessingTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
ValueState<Long> processingTimeTrigger = triggerContext.getPartitionedState(processingTimeTriggerDescriptor); | |
long nextProcessingTimeTriggerTimestamp = getNextProcessingTimeTriggerTimestamp(triggerContext); | |
processingTimeTrigger.update(nextProcessingTimeTriggerTimestamp); | |
triggerContext.registerProcessingTimeTimer(nextProcessingTimeTriggerTimestamp); | |
ValueState<Boolean> moreSinceFired = triggerContext.getPartitionedState(moreSinceFiredDescriptor); | |
if (moreSinceFired.value()) { | |
moreSinceFired.update(false); | |
return TriggerResult.FIRE; | |
} else { | |
return TriggerResult.CONTINUE; | |
} | |
} | |
private long getNextProcessingTimeTriggerTimestamp(TriggerContext triggerContext) throws IOException { | |
ValueState<Boolean> isAfterEndOfWindow = triggerContext.getPartitionedState(isPastTheEndOfWindow); | |
boolean afterEndOfWindow = isAfterEndOfWindow.value(); | |
return afterEndOfWindow ? | |
System.currentTimeMillis() + lateFiringPeriod : | |
System.currentTimeMillis() + earlyFiringPeriod; | |
} | |
@Override | |
public TriggerResult onEventTime(long time, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception { | |
if (time == timeWindow.maxTimestamp()) { | |
// mark that we have passed the end of the window in event time | |
ValueState<Boolean> endOfWindowToProcessing = triggerContext.getPartitionedState(isPastTheEndOfWindow); | |
boolean afterEndOfWindow = endOfWindowToProcessing.value(); | |
if(afterEndOfWindow) { | |
throw new RuntimeException("The end of window for this window has already been found."); | |
} | |
endOfWindowToProcessing.update(true); | |
// remove the already registered timer and update the processing time trigger | |
// so that it fires with the late firing interval | |
ValueState<Long> processingTimeTrigger = triggerContext.getPartitionedState(processingTimeTriggerDescriptor); | |
long timerToRemove = processingTimeTrigger.value(); | |
if(timerToRemove != -1) { | |
triggerContext.deleteProcessingTimeTimer(timerToRemove); | |
} | |
long nextProcessingTimeTriggerTimestamp = getNextProcessingTimeTriggerTimestamp(triggerContext); | |
processingTimeTrigger.update(nextProcessingTimeTriggerTimestamp); | |
triggerContext.registerProcessingTimeTimer(nextProcessingTimeTriggerTimestamp); | |
ValueState<Boolean> moreSinceFired = triggerContext.getPartitionedState(moreSinceFiredDescriptor); | |
if (accumulating) { | |
// register the cleanup timer if we are accumulating (and allow lateness) | |
if (allowedLateness > 0) { | |
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp() + allowedLateness); | |
} | |
if (moreSinceFired.value()) { | |
moreSinceFired.update(false); | |
return TriggerResult.FIRE; | |
} else { | |
return TriggerResult.CONTINUE; | |
} | |
} else { | |
if (moreSinceFired.value()) { | |
moreSinceFired.update(false); | |
return TriggerResult.FIRE_AND_PURGE; | |
} else { | |
return TriggerResult.PURGE; | |
} | |
} | |
} else if (time == timeWindow.maxTimestamp() + allowedLateness) { | |
return TriggerResult.PURGE; | |
} | |
return TriggerResult.CONTINUE; | |
} | |
@Override | |
public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception { | |
// delete the registered processing time timer | |
ValueState<Long> processingTimeTrigger = triggerContext.getPartitionedState(processingTimeTriggerDescriptor); | |
long late = processingTimeTrigger.value(); | |
if(late != -1) { | |
triggerContext.deleteProcessingTimeTimer(late); | |
} | |
processingTimeTrigger.clear(); | |
// delete the flag showing that we are past the end_of_window in event time | |
ValueState<Boolean> isPastEndOfWindow = triggerContext.getPartitionedState(isPastTheEndOfWindow); | |
isPastEndOfWindow.clear(); | |
ValueState<Boolean> moreSinceFired = triggerContext.getPartitionedState(moreSinceFiredDescriptor); | |
moreSinceFired.clear(); | |
// finally delete the regisetered event time timers. | |
triggerContext.deleteEventTimeTimer(window.maxTimestamp()); | |
triggerContext.deleteEventTimeTimer(window.maxTimestamp() + allowedLateness); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment