Skip to content

Instantly share code, notes, and snippets.

@loganmzz
Created October 8, 2015 22:41
Show Gist options
  • Save loganmzz/fb73b85efe2f74df457b to your computer and use it in GitHub Desktop.
Save loganmzz/fb73b85efe2f74df457b to your computer and use it in GitHub Desktop.
Multithread run helper
/**
* <p>Multi-threaded run helper.</p>
* <p>Based on few parameters, launch a fixed number of task and wait a given amount of time before
* interrupting them and give back execution results</p>
*/
public class Executor {
/** Makes tasks starting at the same time **/
public boolean syncStartup = true;
/** Number of task to run in parallel **/
public int parallel = Runtime.getRuntime().availableProcessors();
/** Thread name generator **/
public IntFunction<String> threadName = i -> String.format("Executor-%d", i);
/** Shutdown time out **/
public long shutdownTimeout = 1;
public TimeUnit shutdownUnit = TimeUnit.SECONDS;
/**
* Unchecked barrier await.
* @see CyclicBarrier#await()
*/
private void await(CyclicBarrier barrier) {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
/**
* <p>Launch many <code>cmd</code> in {@link #parallel}, waits given <code>duration</code>
* before interrupting them, finally returns all generated results.</p>
* <p>Tasks can check for "running" status by {@link BooleanSupplier} which is passes in.
* After <code>duration</code>, multi-thread architecture is shutdown, making execution thread
* to be interrupted. Task should complete before {@link #shutdownTimeout} occurs, else a
* an {@link IllegalStateException} is thrown. Same apply if any task terminate in error.</p>
* @param cmd Task to run. Supplied boolean is <code>true</code> while run is active.
* @param duration Duration to wait before interrupting the run.
* @param unit Duration unit.
* @return Result of all {@link #parallel} <code>cmd</code>.
* @throws InterruptedException If interrupted on a wait (<code>duration</code>, {@link #shutdownTimeout} or {@link Future#get() getting result}).
* @throws IllegalStateException If task haven't complet after {@link #shutdownTimeout} or at least one ends in error.
*/
public <T> List<T> launch(Function<BooleanSupplier, T> cmd, long duration, TimeUnit unit) throws InterruptedException, IllegalStateException {
List<T> results = new ArrayList<>(parallel);
AtomicInteger sequence = new AtomicInteger();
ExecutorService executor = Executors.newFixedThreadPool(parallel, run -> new Thread(run, threadName.apply(sequence.incrementAndGet())));
try {
// Launch tasks
List<Future<T>> futures = new ArrayList<>(parallel);
BooleanSupplier active = () -> !executor.isShutdown();
Optional<CyclicBarrier> barrier = Optional.ofNullable(syncStartup ? new CyclicBarrier(parallel) : null);
Callable<T> task = () -> { barrier.ifPresent(this::await); return cmd.apply(active); };
for (int i = 0; i < parallel; i++) {
Future<T> future = executor.submit(task);
futures.add(future);
}
// Wait
unit.sleep(duration);
barrier.ifPresent(CyclicBarrier::reset);
executor.shutdownNow();
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
throw new IllegalStateException("Tasks not terminated after " + shutdownTimeout + " " + shutdownUnit);
}
// Collect
int index = 1;
IllegalStateException ex = null;
for (Future<T> future : futures) {
try {
results.add(future.get());
} catch (ExecutionException e) {
if (ex == null) {
ex = new IllegalStateException("Can't retrieve results");
}
ex.addSuppressed(new IllegalStateException("Error on task #" + index, e));
}
index++;
}
if (ex != null) {
throw ex;
}
} finally {
if (!executor.isTerminated()) {
executor.shutdownNow();
}
}
return results;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment