Skip to content

Instantly share code, notes, and snippets.

@shevek
Last active August 18, 2017 21:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shevek/5f8b3fad55c346ca1391349404a2c0fa to your computer and use it in GitHub Desktop.
Save shevek/5f8b3fad55c346ca1391349404a2c0fa to your computer and use it in GitHub Desktop.
public class ExecutorManager {
@Nonnull
public static ExecutorService newExecutorService(@Nonnull String name) {
int ncpus = Runtime.getRuntime().availableProcessors();
ThreadPoolExecutor executor = new ThreadPoolExecutor(ncpus, ncpus, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(ncpus * 10), new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).build());
executor.allowCoreThreadTimeOut(true);
return executor;
}
private static final int EXCEPTIONS_MAX = 100;
private final ExecutorCompletionService completionService;
private final AtomicInteger outstanding = new AtomicInteger(0);
private final List<Exception> exceptions = new ArrayList<>(EXCEPTIONS_MAX);
public ExecutorManager(@Nonnull Executor executor) {
this.completionService = new ExecutorCompletionService(executor);
}
private void get(@Nonnull Future<?> f) {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
synchronized (exceptions) {
if (exceptions.size() < EXCEPTIONS_MAX)
exceptions.add(e);
}
}
}
@Nonnull
public <V> Future<V> execute(@Nonnull Callable<V> r) throws ExecutionException {
for (;;) {
Future<V> f = completionService.poll();
if (f == null)
break;
outstanding.getAndDecrement();
get(f);
}
outstanding.getAndIncrement();
return completionService.submit(r);
}
public void await() throws InterruptedException, ExecutionException {
while (outstanding.getAndDecrement() > 0) {
Future<?> f = completionService.take();
get(f);
}
synchronized (exceptions) {
if (exceptions.isEmpty())
return;
Exception e = exceptions.get(0);
for (int i = 1; i < exceptions.size(); i++)
e.addSuppressed(exceptions.get(i));
exceptions.clear();
Throwables.propagateIfPossible(e, InterruptedException.class, ExecutionException.class);
throw Throwables.propagate(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment