Skip to content

Instantly share code, notes, and snippets.

@vmarcinko
Last active May 22, 2019 06:51
Show Gist options
  • Save vmarcinko/8d3a9ac8092ded099f61a62fccf78763 to your computer and use it in GitHub Desktop.
Save vmarcinko/8d3a9ac8092ded099f61a62fccf78763 to your computer and use it in GitHub Desktop.
public void processQueuedItems() {
try {
LocalDateTime now = LocalDateTime.now();
List<?> itemIds = this.queueConsumerModule.findItemIdsWhereQueueingNextAttemptTimeIsBefore(now, itemsPollSize);
if (!itemIds.isEmpty()) {
logger.info("Fetched {} pending queued items", itemIds.size());
for (Object itemId : itemIds) {
processItemAndHandleErrorIfRaised(itemId);
}
}
} catch (Throwable th) {
logger.error("Error while fetching queued items: " + th.getMessage(), th);
}
}
private void processItemAndHandleErrorIfRaised(Object itemId) {
try {
executeUnderTransaction(() -> processItem(itemId));
} catch (Throwable error) {
executeUnderTransaction(() -> registerProcessingFailure(itemId, error));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment