Skip to content

Instantly share code, notes, and snippets.

@navkast
Created November 16, 2016 04:29
Show Gist options
  • Save navkast/ca08279266c27acefa7d260779d96163 to your computer and use it in GitHub Desktop.
Save navkast/ca08279266c27acefa7d260779d96163 to your computer and use it in GitHub Desktop.
CyclicBarrierRunner: A Runnable executor that executes all submitted jobs at the exact same time. Helpful for writing concurrency tests.
package com.navkast.concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A {@link Runnable} executor that executes all submitted jobs at the exact same time. Helpful for
* writing concurrency tests.
* <p>
* Usage:
* <pre>
* {@code CyclicBarrierRunner runner = new CyclicBarrierRunner(1, TimeUnit.SECONDS);
* runner.submit(() -> concurrentPerfTest_1()); // submit job, but don't start.
* runner.submit(() -> concurrentPerfTest_2()); // submit job, but don't start.
* runner.submit(() -> concurrentPerfTest_3()); // submit job, but don't start.
* boolean completed = runner.execute(); // start every job at the same time.
* // Wait until all jobs are complete, or until 1 second
* // has elapsed.
* if (!completed) {
* throw new PerformanceRegressionException("Failed to complete concurrent perf test on time!");
* }
* }
* </pre>
*
* @author navkast.
*/
public final class CyclicBarrierRunner {
private final List<Runnable> runnables;
private final long timeout;
private final TimeUnit type;
private final AtomicReference<Throwable> exception = new AtomicReference<>();
private final ThreadFactory threadFactory;
// Latch for counting daemon threads.
private CountDownLatch latch;
/**
* Creates a {@link CyclicBarrierRunner} that blocks for the specified amount of time for all
* jobs to complete.
*/
public CyclicBarrierRunner(long timeout, TimeUnit type) {
this.runnables = new ArrayList<>();
this.timeout = timeout;
this.type = type;
this.threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("cyclic-barrier-runner-%d")
.setUncaughtExceptionHandler((t, e) -> setException(e))
.build();
}
/**
* Submits a job.
*/
public void submit(Runnable runnable) {
runnables.add(runnable);
}
/**
* Executes all jobs submitted. Blocks until all jobs are executed for the time specified upon
* construction. If any of the jobs complete exceptionally, then the rest of the jobs are abandoned
* and the exception is propagated to the main thread as a RuntimeException.
*
* @return whether all jobs completed on time.
* @throws RuntimeException if any of the jobs complete exceptionally.
*/
public boolean execute() {
int size = runnables.size();
latch = new CountDownLatch(size);
CyclicBarrier barrier = new CyclicBarrier(size + 1); // + 1 for final barrier.
// Modify all runnables with barriers and latches
List<Runnable> modifiedRunnables = new ArrayList<>(size);
for (Runnable runnable : runnables) {
modifiedRunnables.add(() -> {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
runnable.run();
latch.countDown();
});
}
// Queue up all jobs for running.
for (Runnable runnable : modifiedRunnables) {
threadFactory.newThread(runnable).start();
}
// Run the jobs
boolean success;
try {
barrier.await();
// block until everything is done.
success = latch.await(timeout, type);
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
Throwable ex = exception.get();
if (ex != null) {
// Propagate exception to main thread.
throw new RuntimeException(ex);
}
return success;
}
private synchronized void setException(Throwable throwable) {
exception.set(throwable);
// Fail every other task fast.
while (latch.getCount() > 0) {
latch.countDown();
}
}
}
package com.navkast.concurrent
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.testng.annotations.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* @author navkast
*/
public class CyclicBarrierRunnerTest {
@Test(invocationCount = 100)
public void testCyclicBarrierRunner() {
AtomicInteger integer = new AtomicInteger(0);
CyclicBarrierRunner runner = new CyclicBarrierRunner(1, TimeUnit.SECONDS);
// submit 3 jobs
runner.submit(integer::incrementAndGet);
runner.submit(integer::incrementAndGet);
runner.submit(integer::incrementAndGet);
runner.execute();
assertThat(integer.get(), is(3));
}
@Test(expectedExceptions = RuntimeException.class)
public void testCyclicBarrierRunnerWithError() {
CyclicBarrierRunner runner = new CyclicBarrierRunner(10, TimeUnit.DAYS);
runner.submit(() -> {
try {
// This job will sleep for basically forever, but the other job's exception will cause
// the whole barrier runner to fail immediately.
TimeUnit.DAYS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
runner.submit(() -> { throw new RuntimeException(); });
runner.execute();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment