Skip to content

Instantly share code, notes, and snippets.

@hortega
Last active January 21, 2020 22:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hortega/f81df4efa836a0fab6f53cf132cc10a9 to your computer and use it in GitHub Desktop.
Save hortega/f81df4efa836a0fab6f53cf132cc10a9 to your computer and use it in GitHub Desktop.
ExecutorCompletionService example to asynchronously process elements from a queue
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.*;
public class QueueConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(QueueConsumer.class);
private Queue<Integer> queue;
private CompletionService<Result> completionService;
public QueueConsumer(Queue<Integer> queue) {
this.queue = queue;
ExecutorService executorService = Executors.newFixedThreadPool(5);
this.completionService = new ExecutorCompletionService(executorService);
}
public void consumeBatch() {
int workerCount = submitTasks();
while (workerCount > 0) {
try {
Result result = completionService.take().get();
System.out.printf(result.getResultOfLengthyTask());
workerCount--;
} catch (InterruptedException e) {
// not sure what to do here but to log it...
LOGGER.error("Our worker got interrupted...{}", e);
} catch (ExecutionException e) {
if(e.getCause() instanceof ManageableException) {
// we can recover from this
// but we still log it
LOGGER.warn("Something didn't go quite well...{}", e);
} else {
// not much we can do
LOGGER.error("Something went really wrong...{}", e);
}
}
}
}
private int submitTasks() {
int workerCount = 0;
Integer next = queue.peek();
while(next != null) {
try {
completionService.submit(new Worker(next));
queue.poll();
next = queue.peek();
workerCount++;
} catch (RejectedExecutionException e) {
// executor can't accept anymore task so we stop submitting
LOGGER.warn("Can't submit anymore tasks...{}", e);
return workerCount;
}
}
return workerCount;
}
class ManageableException extends RuntimeException{}
class Result {
private String resultOfLengthyTask;
Result(String resultOfLengthyTask) {
this.resultOfLengthyTask = resultOfLengthyTask;
}
public String getResultOfLengthyTask() {
return resultOfLengthyTask;
}
}
class Worker implements Callable<Result> {
private Integer importantIntegerForOurLenghtyTask;
public Worker(Integer importantIntegerForOurLenghtyTask) {
this.importantIntegerForOurLenghtyTask = importantIntegerForOurLenghtyTask;
}
@Override
public Result call() throws Exception {
// here we do the stuff that takes a while and return the result
Thread.sleep(new Random().nextInt(10));
return new Result("woooow " + importantIntegerForOurLenghtyTask);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment