Skip to content

Instantly share code, notes, and snippets.

@vmarcinko
Created June 3, 2019 14:56
Show Gist options
  • Save vmarcinko/1940a7b7b212a19fb96f828a9ce92bef to your computer and use it in GitHub Desktop.
Save vmarcinko/1940a7b7b212a19fb96f828a9ce92bef to your computer and use it in GitHub Desktop.
public QueueConsumer(
QueueConsumerModule<?> queueConsumerModule,
RetryPolicy retryPolicy,
PlatformTransactionManager transactionManager,
int polledItemsLimit,
long pollingPeriodInSecs,
int processingThreadCount
) {
// ...
this.processingExecutorService = Executors.newFixedThreadPool(processingThreadCount);
}
public void processQueuedItems() {
try {
LocalDateTime now = LocalDateTime.now();
List<?> itemIds = this.queueConsumerModule.findItemIdsWhereQueueingNextAttemptTimeIsBefore(now, itemsPollSize);
if (!itemIds.isEmpty()) {
logger.info("Fetched {} pending queued items", itemIds.size());
List<Callable<Object>> itemProcessingCallables = constructItemProcessingCallables(itemIds);
this.processingExecutorService.invokeAll(itemProcessingCallables);
}
} catch (Throwable th) {
logger.error("Error while fetching queued items: " + th.getMessage(), th);
}
}
private List<Callable<Object>> constructItemProcessingCallables(List<?> itemIds) {
return itemIds.stream()
.map(itemId -> Executors.callable(() -> processItemAndHandleErrorIfRaised(itemId)))
.collect(Collectors.toList());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment