Skip to content

Instantly share code, notes, and snippets.

@isopropylcyanide
Last active April 12, 2021 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save isopropylcyanide/a2abee485c4d9b1f75fc4d9c235a8be9 to your computer and use it in GitHub Desktop.
Save isopropylcyanide/a2abee485c4d9b1f75fc4d9c235a8be9 to your computer and use it in GitHub Desktop.
/**
* A concurrent work executor that blocks for the final result after individual execution results
* are obtained. The results are fed into the queue represented by completion service as they are
* getting completed. Note that this behavior can be changed by using a single threaded executor
* <p>
* If execution of any individual work results in an exception, an exception is raised
*/
static class OutOfOrderConcurrentWorkExecutor implements ConcurrentWorkExecutor {
@Override
public <T, U, V> V splitJoin(int size, Iterable<U> iterable, Function<U, T> mapper, Collector<T, ?, V> collector) throws ExecutionException, InterruptedException {
ThreadFactory threadFactory = Executors.defaultThreadFactory();
ExecutorService executor = Executors.newCachedThreadPool(threadFactory); //create a cached thread pool ideal for short lived tasks
AtomicInteger submitTaskCount = new AtomicInteger(0); //to keep track of submitted tasks
CompletionService<T> service = new ExecutorCompletionService<>(executor);
try {
iterable.forEach(u -> { //note that iterable.size() must not be really large as cached thread pool is unbounded
//we typically restrict this size bound when we accept the request itself.
service.submit(() -> {
log.info("Submitting task f({}) on {}", u, Thread.currentThread().getName());
return mapper.apply(u);
}); //user specified mapper is invoked here to create a task and submitted
submitTaskCount.incrementAndGet();
});
} finally {
executor.shutdown(); // stop accepting any more tasks except for the ones that are submitted
}
List<T> results = new ArrayList<>();
for (int i = 0; i < Math.min(submitTaskCount.get(), size); i++) { //if we didn't do min(size, submitted) & size > submitted, it would lead to an
//infinite loop as take() blocks on an empty queue
T t;
try {
t = service.take().get();
log.info("Finished {}/{}: {} on {}", i, submitTaskCount.get(), t, Thread.currentThread().getName());
} catch (ExecutionException ex) {
log.error("Received error during computation for result in Thread [{}] {}", Thread.currentThread().getId(), ex.getMessage());
throw ex; //letting the application handle it
}
if (t != null) {
results.add(t); //collecting the out of order result
}
}
return results.stream().collect(collector); //applying user specified collector to the collection of intermediate results
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment