Skip to content

Instantly share code, notes, and snippets.

@mmimica
Created April 7, 2019 21:08
Show Gist options
  • Save mmimica/5c7d3dcf6036fecf097dc3c534b04b63 to your computer and use it in GitHub Desktop.
Save mmimica/5c7d3dcf6036fecf097dc3c534b04b63 to your computer and use it in GitHub Desktop.
public class BatchingAggregator<T> {
private final int maxBatchSize;
private final InterruptableConsumer<T> consumer;
private final BlockingQueue<T> queue = new LinkedBlockingDeque<>(maxQueueSize);
private final Thread thread = new Thread(this::process);
{
this.thread.start();
}
@FunctionalInterface
public interface InterruptableConsumer<T> {
void accept(List<T> t) throws InterruptedException;
}
private void process() {
while (!Thread.interrupted()) {
try {
T item = queue.take();
List<T> bucket = new ArrayList<>();
bucket.add(item);
while (bucket.size() < maxBatchSize && (item = queue.poll()) != null) {
bucket.add(item);
}
consumer.accept(bucket);
} catch (InterruptedException e) {
LOGGER.info("Interrupted");
break;
}
catch (Exception ex) {
LOGGER.info("Error while processing bucket queue", ex);
}
}
}
public boolean feed(T item) {
return queue.offer(item);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment