Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simpleusr/7c56d4384f6fc9f0a61860a680bb5f36 to your computer and use it in GitHub Desktop.
Save simpleusr/7c56d4384f6fc9f0a61860a680bb5f36 to your computer and use it in GitHub Desktop.
BaseCountAggregateBasedEarlyFiringTrigger
public abstract class BaseCountAggregateBasedEarlyFiringTrigger<T extends IRequest> extends Trigger<T, TimeWindow> {
private static final long serialVersionUID = 1L;
private final int earlyFireThreshold;
protected Logger logger;
public BaseCountAggregateBasedEarlyFiringTrigger(int earlyFireThreshold) {
this.earlyFireThreshold = earlyFireThreshold;
if (earlyFireThreshold <= 0) {
throw new RuntimeException(String.format("earlyFireThreshold must be positive! currentValue : %s", earlyFireThreshold));
}
logger = LoggerFactory.getLogger(getClass());
}
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
try {
ValueState<Integer> encounteredElementsCountState = ctx.getPartitionedState(new ValueStateDescriptor<>("encounteredElementsCountState",
Integer.class));
// TODO REMOVE LOG
logger.info("onElement - element: "
+ element
+ " window.hashCode(): "
+ window.hashCode()
+ " ctx.getCurrentWatermark(): "
+ ctx.getCurrentWatermark()
+ " w.getStart(): "
+ window.getStart()
+ " w.getEnd(): "
+ window.getEnd()
+ " encounteredElementsCountState.value(): "
+ String.valueOf(encounteredElementsCountState.value())
+ " threadName : "
+ Thread.currentThread());
// TODO SHOULD BE REMOVED???
if (ctx.getCurrentWatermark() < 0) {
logger.debug(String.format("onElement processing skipped for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
return TriggerResult.CONTINUE;
}
if (encounteredElementsCountState.value() == null) {
encounteredElementsCountState.update(0);
}
encounteredElementsCountState.update(encounteredElementsCountState.value() + 1);
int encounteredElementsCount = encounteredElementsCountState.value().intValue();
if (encounteredElementsCount == 1) {
// register timer for the end of the window
ctx.registerEventTimeTimer(window.maxTimestamp());
}
if (encounteredElementsCount == earlyFireThreshold) {
logger.debug(String.format("onElement - triggering early fire for eventId : %s for watermark: %s ", element.getEventId(), ctx.getCurrentWatermark()));
ValueState<Boolean> isEarlyTriggerState = ctx.getPartitionedState(new ValueStateDescriptor<>("isEarlyTriggerState", Boolean.class));
isEarlyTriggerState.update(true);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} catch (Exception e) {
logger.error("onElement error", e);
throw e;
}
}
@Override
public TriggerResult onEventTime(long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
try {
// TODO REMOVE LOG
logger.info("onEventTime - ts: "
+ timestamp
+ " ctx.getCurrentWatermark(): "
+ ctx.getCurrentWatermark()
+ " w.getStart(): "
+ window.getStart()
+ " w.getEnd(): "
+ window.getEnd()
+ " threadName : "
+ Thread.currentThread());
if (timestamp == window.maxTimestamp()) {
logger.debug("onEventTime - windowMaxTimestamp returning TriggerResult.FIRE_AND_PURGE");
return TriggerResult.FIRE_AND_PURGE;
} else {
logger.error(String.format("onEventTime - unexpected timestamp : %s encountered. windowStart : %s, windowEnd : %s ",
timestamp,
window.getStart(),
window.getEnd()));
ctx.registerEventTimeTimer(window.getEnd());
return TriggerResult.CONTINUE;
}
} catch (Exception e) {
logger.error("onEventTime error", e);
throw e;
}
}
@Override
public TriggerResult onProcessingTime(long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
try {
return TriggerResult.CONTINUE;
} catch (Exception e) {
logger.error("onProcessingTime error", e);
throw e;
}
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
try {
// TODO REMOVE LOG
logger.info("clear - ctx.getCurrentWatermark(): "
+ ctx.getCurrentWatermark()
+ " w.getStart(): "
+ window.getStart()
+ " w.getEnd(): "
+ window.getEnd()
+ " threadName : "
+ Thread.currentThread());
ctx.deleteEventTimeTimer(window.maxTimestamp());
ValueState<Integer> encounteredElementsCountState = ctx.getPartitionedState(new ValueStateDescriptor<>("encounteredElementsCountState",
Integer.class));
encounteredElementsCountState.clear();
ValueState<Boolean> isEarlyTriggerState = ctx.getPartitionedState(new ValueStateDescriptor<>("isEarlyTriggerState", Boolean.class));
isEarlyTriggerState.clear();
} catch (Exception e) {
logger.error("clear error", e);
throw e;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment