Created
July 25, 2019 15:25
-
-
Save simpleusr/7c56d4384f6fc9f0a61860a680bb5f36 to your computer and use it in GitHub Desktop.
BaseCountAggregateBasedEarlyFiringTrigger
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public abstract class BaseCountAggregateBasedEarlyFiringTrigger<T extends IRequest> extends Trigger<T, TimeWindow> { | |
private static final long serialVersionUID = 1L; | |
private final int earlyFireThreshold; | |
protected Logger logger; | |
public BaseCountAggregateBasedEarlyFiringTrigger(int earlyFireThreshold) { | |
this.earlyFireThreshold = earlyFireThreshold; | |
if (earlyFireThreshold <= 0) { | |
throw new RuntimeException(String.format("earlyFireThreshold must be positive! currentValue : %s", earlyFireThreshold)); | |
} | |
logger = LoggerFactory.getLogger(getClass()); | |
} | |
@Override | |
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { | |
try { | |
ValueState<Integer> encounteredElementsCountState = ctx.getPartitionedState(new ValueStateDescriptor<>("encounteredElementsCountState", | |
Integer.class)); | |
// TODO REMOVE LOG | |
logger.info("onElement - element: " | |
+ element | |
+ " window.hashCode(): " | |
+ window.hashCode() | |
+ " ctx.getCurrentWatermark(): " | |
+ ctx.getCurrentWatermark() | |
+ " w.getStart(): " | |
+ window.getStart() | |
+ " w.getEnd(): " | |
+ window.getEnd() | |
+ " encounteredElementsCountState.value(): " | |
+ String.valueOf(encounteredElementsCountState.value()) | |
+ " threadName : " | |
+ Thread.currentThread()); | |
// TODO SHOULD BE REMOVED??? | |
if (ctx.getCurrentWatermark() < 0) { | |
logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark())); | |
return TriggerResult.CONTINUE; | |
} | |
if (encounteredElementsCountState.value() == null) { | |
encounteredElementsCountState.update(0); | |
} | |
encounteredElementsCountState.update(encounteredElementsCountState.value() + 1); | |
int encounteredElementsCount = encounteredElementsCountState.value().intValue(); | |
if (encounteredElementsCount == 1) { | |
// register timer for the end of the window | |
ctx.registerEventTimeTimer(window.maxTimestamp()); | |
} | |
if (encounteredElementsCount == earlyFireThreshold) { | |
logger.debug(String.format("onElement - triggering early fire for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark())); | |
ValueState<Boolean> isEarlyTriggerState = ctx.getPartitionedState(new ValueStateDescriptor<>("isEarlyTriggerState", Boolean.class)); | |
isEarlyTriggerState.update(true); | |
return TriggerResult.FIRE; | |
} | |
return TriggerResult.CONTINUE; | |
} catch (Exception e) { | |
logger.error("onElement error", e); | |
throw e; | |
} | |
} | |
@Override | |
public TriggerResult onEventTime(long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { | |
try { | |
// TODO REMOVE LOG | |
logger.info("onEventTime - ts: " | |
+ timestamp | |
+ " ctx.getCurrentWatermark(): " | |
+ ctx.getCurrentWatermark() | |
+ " w.getStart(): " | |
+ window.getStart() | |
+ " w.getEnd(): " | |
+ window.getEnd() | |
+ " threadName : " | |
+ Thread.currentThread()); | |
if (timestamp == window.maxTimestamp()) { | |
logger.debug("onEventTime - windowMaxTimestamp returning TriggerResult.FIRE_AND_PURGE"); | |
return TriggerResult.FIRE_AND_PURGE; | |
} else { | |
logger.error(String.format("onEventTime - unexpected timestamp : %s encountered. windowStart : %s, windowEnd : %s ", | |
timestamp, | |
window.getStart(), | |
window.getEnd())); | |
ctx.registerEventTimeTimer(window.getEnd()); | |
return TriggerResult.CONTINUE; | |
} | |
} catch (Exception e) { | |
logger.error("onEventTime error", e); | |
throw e; | |
} | |
} | |
@Override | |
public TriggerResult onProcessingTime(long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { | |
try { | |
return TriggerResult.CONTINUE; | |
} catch (Exception e) { | |
logger.error("onProcessingTime error", e); | |
throw e; | |
} | |
} | |
@Override | |
public void clear(TimeWindow window, TriggerContext ctx) throws Exception { | |
try { | |
// TODO REMOVE LOG | |
logger.info("clear - ctx.getCurrentWatermark(): " | |
+ ctx.getCurrentWatermark() | |
+ " w.getStart(): " | |
+ window.getStart() | |
+ " w.getEnd(): " | |
+ window.getEnd() | |
+ " threadName : " | |
+ Thread.currentThread()); | |
ctx.deleteEventTimeTimer(window.maxTimestamp()); | |
ValueState<Integer> encounteredElementsCountState = ctx.getPartitionedState(new ValueStateDescriptor<>("encounteredElementsCountState", | |
Integer.class)); | |
encounteredElementsCountState.clear(); | |
ValueState<Boolean> isEarlyTriggerState = ctx.getPartitionedState(new ValueStateDescriptor<>("isEarlyTriggerState", Boolean.class)); | |
isEarlyTriggerState.clear(); | |
} catch (Exception e) { | |
logger.error("clear error", e); | |
throw e; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment