Last active
August 18, 2017 21:35
-
-
Save shevek/5f8b3fad55c346ca1391349404a2c0fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ExecutorManager { | |
@Nonnull | |
public static ExecutorService newExecutorService(@Nonnull String name) { | |
int ncpus = Runtime.getRuntime().availableProcessors(); | |
ThreadPoolExecutor executor = new ThreadPoolExecutor(ncpus, ncpus, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(ncpus * 10), new ThreadPoolExecutor.CallerRunsPolicy()); | |
executor.setThreadFactory(new ThreadFactoryBuilder().setNameFormat(name + "-%d").setDaemon(true).build()); | |
executor.allowCoreThreadTimeOut(true); | |
return executor; | |
} | |
private static final int EXCEPTIONS_MAX = 100; | |
private final ExecutorCompletionService completionService; | |
private final AtomicInteger outstanding = new AtomicInteger(0); | |
private final List<Exception> exceptions = new ArrayList<>(EXCEPTIONS_MAX); | |
public ExecutorManager(@Nonnull Executor executor) { | |
this.completionService = new ExecutorCompletionService(executor); | |
} | |
private void get(@Nonnull Future<?> f) { | |
try { | |
f.get(); | |
} catch (InterruptedException | ExecutionException e) { | |
synchronized (exceptions) { | |
if (exceptions.size() < EXCEPTIONS_MAX) | |
exceptions.add(e); | |
} | |
} | |
} | |
@Nonnull | |
public <V> Future<V> execute(@Nonnull Callable<V> r) throws ExecutionException { | |
for (;;) { | |
Future<V> f = completionService.poll(); | |
if (f == null) | |
break; | |
outstanding.getAndDecrement(); | |
get(f); | |
} | |
outstanding.getAndIncrement(); | |
return completionService.submit(r); | |
} | |
public void await() throws InterruptedException, ExecutionException { | |
while (outstanding.getAndDecrement() > 0) { | |
Future<?> f = completionService.take(); | |
get(f); | |
} | |
synchronized (exceptions) { | |
if (exceptions.isEmpty()) | |
return; | |
Exception e = exceptions.get(0); | |
for (int i = 1; i < exceptions.size(); i++) | |
e.addSuppressed(exceptions.get(i)); | |
exceptions.clear(); | |
Throwables.propagateIfPossible(e, InterruptedException.class, ExecutionException.class); | |
throw Throwables.propagate(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment