Skip to content

Instantly share code, notes, and snippets.

@forcemax
Created July 21, 2015 07:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save forcemax/3a76166277bc90941d4d to your computer and use it in GitHub Desktop.
Save forcemax/3a76166277bc90941d4d to your computer and use it in GitHub Desktop.
public void execute(Tuple input) {
if (isTickTuple(input)) {
LOGGER.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
} else {
countObj(input);
}
collector.ack(input);
}
private void emitCurrentWindowCounts() {
Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
lastModifiedTracker.markAsModified();
if (actualWindowLengthInSeconds < windowLengthInSeconds-1) {
LOGGER.debug(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
}
emit(counts, actualWindowLengthInSeconds);
}
private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
for (Entry<Object, Long> entry : counts.entrySet()) {
Object obj = entry.getKey();
Long count = entry.getValue();
collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment