Skip to content

Instantly share code, notes, and snippets.

@vmarcinko
Last active June 3, 2019 15:34
Show Gist options
  • Save vmarcinko/fbf9dfb70867efd8e8fed9834564b9cf to your computer and use it in GitHub Desktop.
Save vmarcinko/fbf9dfb70867efd8e8fed9834564b9cf to your computer and use it in GitHub Desktop.
public QueueConsumer(
QueueConsumerModule<?> queueConsumerModule,
RetryPolicy retryPolicy,
PlatformTransactionManager transactionManager,
int polledItemsLimit,
long pollingPeriodInSecs,
int partitionCount
) {
// ...
this.scheduledExecutorService = Executors.newScheduledThreadPool(partitionCount);
}
// ...
private void startProcessingTasks() {
logger.info("Starting {} queue polling tasks with delay of {} secs", partitionCount, pollingPeriodInSecs);
Set<ScheduledFuture<?>> tasks = new HashSet<>();
for (int i = 0; i < partitionCount; i++) {
final int partition = i; // lambda requires 'final' variable, and loop requires it to be incremented, so we have to do it this way
Runnable command = () -> processQueuedItems(partition);
ScheduledFuture<?> task = this.scheduledExecutorService.scheduleWithFixedDelay(command, pollingPeriodInSecs, pollingPeriodInSecs, TimeUnit.SECONDS);
tasks.add(task);
}
this.processingTasks = tasks;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment