Skip to content

Instantly share code, notes, and snippets.

@tomwhoiscontrary
Created April 26, 2018 15:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomwhoiscontrary/acb12b2c30581fbbb586534fdef1ac0d to your computer and use it in GitHub Desktop.
Save tomwhoiscontrary/acb12b2c30581fbbb586534fdef1ac0d to your computer and use it in GitHub Desktop.
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public class Nursery {
public static void run(int terminationTimeout, TimeUnit terminationTimeUnit, Consumer<ExecutorService> block) throws InterruptedException, TimeoutException, ExecutionException {
AtomicReference<Throwable> nurseryError = new AtomicReference<>();
ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>()) {
AtomicInteger live = new AtomicInteger(0);
@Override
public void execute(Runnable command) {
live.incrementAndGet();
super.execute(command);
}
@Override
protected void afterExecute(Runnable task, Throwable taskError) {
int liveNow = live.decrementAndGet();
if (liveNow == 0) shutdown();
if (taskError != null) {
Throwable previousError = nurseryError.getAndSet(taskError);
if (previousError == null) shutdownNow();
}
}
};
executorService.execute(() -> block.accept(executorService));
boolean terminated = executorService.awaitTermination(terminationTimeout, terminationTimeUnit);
if (!terminated) {
executorService.shutdownNow();
throw new TimeoutException();
}
Throwable throwable = nurseryError.get();
if (throwable != null) throw new ExecutionException(throwable);
}
public static void main(String[] args) {
boolean hang = false;
boolean blowUp = false;
Thread.setDefaultUncaughtExceptionHandler((t, e) -> report("DEATH OF " + t.getName(), e));
System.out.println("main starting");
try {
Nursery.run(1, TimeUnit.SECONDS, exec -> {
System.out.println("block starting");
exec.execute(() -> {
System.out.println("task 1 starting");
sleepQuietly(100);
System.out.println("task 1 done");
});
sleepQuietly(200);
exec.execute(() -> {
System.out.println("task 2 starting");
sleepQuietly(200);
System.out.println("task 2 done");
});
exec.execute(() -> {
System.out.println("task 3 starting");
sleepQuietly(100);
System.out.println("task 3 done");
});
if (hang) sleepQuietly(2000);
if (blowUp) throw new RuntimeException("oh no");
System.out.println("block done");
});
} catch (InterruptedException e) {
report("DIED BY INTERRUPTION", e);
} catch (TimeoutException e) {
report("DIED BY TIMEOUT", e);
} catch (ExecutionException e) {
report("DIED BY EXECUTION", e);
}
System.out.println("main done");
}
private static void report(String label, Throwable e) {
synchronized (System.err) {
System.err.print(label + ": ");
e.printStackTrace(System.err);
}
}
private static void sleepQuietly(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment