Created
October 8, 2015 22:41
-
-
Save loganmzz/fb73b85efe2f74df457b to your computer and use it in GitHub Desktop.
Multithread run helper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* <p>Multi-threaded run helper.</p> | |
* <p>Based on few parameters, launch a fixed number of task and wait a given amount of time before | |
* interrupting them and give back execution results</p> | |
*/ | |
public class Executor { | |
/** Makes tasks starting at the same time **/ | |
public boolean syncStartup = true; | |
/** Number of task to run in parallel **/ | |
public int parallel = Runtime.getRuntime().availableProcessors(); | |
/** Thread name generator **/ | |
public IntFunction<String> threadName = i -> String.format("Executor-%d", i); | |
/** Shutdown time out **/ | |
public long shutdownTimeout = 1; | |
public TimeUnit shutdownUnit = TimeUnit.SECONDS; | |
/** | |
* Unchecked barrier await. | |
* @see CyclicBarrier#await() | |
*/ | |
private void await(CyclicBarrier barrier) { | |
try { | |
barrier.await(); | |
} catch (InterruptedException | BrokenBarrierException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
/** | |
* <p>Launch many <code>cmd</code> in {@link #parallel}, waits given <code>duration</code> | |
* before interrupting them, finally returns all generated results.</p> | |
* <p>Tasks can check for "running" status by {@link BooleanSupplier} which is passes in. | |
* After <code>duration</code>, multi-thread architecture is shutdown, making execution thread | |
* to be interrupted. Task should complete before {@link #shutdownTimeout} occurs, else a | |
* an {@link IllegalStateException} is thrown. Same apply if any task terminate in error.</p> | |
* @param cmd Task to run. Supplied boolean is <code>true</code> while run is active. | |
* @param duration Duration to wait before interrupting the run. | |
* @param unit Duration unit. | |
* @return Result of all {@link #parallel} <code>cmd</code>. | |
* @throws InterruptedException If interrupted on a wait (<code>duration</code>, {@link #shutdownTimeout} or {@link Future#get() getting result}). | |
* @throws IllegalStateException If task haven't complet after {@link #shutdownTimeout} or at least one ends in error. | |
*/ | |
public <T> List<T> launch(Function<BooleanSupplier, T> cmd, long duration, TimeUnit unit) throws InterruptedException, IllegalStateException { | |
List<T> results = new ArrayList<>(parallel); | |
AtomicInteger sequence = new AtomicInteger(); | |
ExecutorService executor = Executors.newFixedThreadPool(parallel, run -> new Thread(run, threadName.apply(sequence.incrementAndGet()))); | |
try { | |
// Launch tasks | |
List<Future<T>> futures = new ArrayList<>(parallel); | |
BooleanSupplier active = () -> !executor.isShutdown(); | |
Optional<CyclicBarrier> barrier = Optional.ofNullable(syncStartup ? new CyclicBarrier(parallel) : null); | |
Callable<T> task = () -> { barrier.ifPresent(this::await); return cmd.apply(active); }; | |
for (int i = 0; i < parallel; i++) { | |
Future<T> future = executor.submit(task); | |
futures.add(future); | |
} | |
// Wait | |
unit.sleep(duration); | |
barrier.ifPresent(CyclicBarrier::reset); | |
executor.shutdownNow(); | |
if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { | |
throw new IllegalStateException("Tasks not terminated after " + shutdownTimeout + " " + shutdownUnit); | |
} | |
// Collect | |
int index = 1; | |
IllegalStateException ex = null; | |
for (Future<T> future : futures) { | |
try { | |
results.add(future.get()); | |
} catch (ExecutionException e) { | |
if (ex == null) { | |
ex = new IllegalStateException("Can't retrieve results"); | |
} | |
ex.addSuppressed(new IllegalStateException("Error on task #" + index, e)); | |
} | |
index++; | |
} | |
if (ex != null) { | |
throw ex; | |
} | |
} finally { | |
if (!executor.isTerminated()) { | |
executor.shutdownNow(); | |
} | |
} | |
return results; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment