Skip to content

Instantly share code, notes, and snippets.

@Christopher-Barham-AKQA
Created August 28, 2012 09:45
Show Gist options
  • Save Christopher-Barham-AKQA/3496645 to your computer and use it in GitHub Desktop.
Save Christopher-Barham-AKQA/3496645 to your computer and use it in GitHub Desktop.
assertConcurrent - junit assert method
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.jpos.util.NameRegistrar.NotFoundException;
import org.junit.Test;

public class SomeConcurrencyTest {
    static final int TOTAL_THREADS_TO_RUN = 1000;

    @Test
    public void testConcurrency() throws Exception {
        List<Runnable> parrallelTasksList = new ArrayList<Runnable>(TOTAL_THREADS_TO_RUN);
        for (int i = 0; i < TOTAL_THREADS_TO_RUN; i++) {
            final int counter = i;
            parrallelTasksList.add(new Runnable() {
                public void run() {
// <PSEUDOCODE>
                    setup object to test and invoke
                    assertThat(someMultiThreadSafeObjectToTest, is(tested));
// </PSEUDOCODE>
                }
            });
        }
        int maxTimeoutSeconds = 5;
        assertConcurrent("somemethod must be ThreadSafe", parrallelTasksList, maxTimeoutSeconds);
    }


public static void assertConcurrent(final String message, final List<? extends Runnable> runnables, final int maxTimeoutSeconds) throws InterruptedException {
  final int numThreads = runnables.size();
  final List<Throwable> exceptions = Collections.synchronizedList(new ArrayList<Throwable>());
  final ExecutorService threadPool = Executors.newFixedThreadPool(numThreads);
  try {
    final CountDownLatch allExecutorThreadsReady = new CountDownLatch(numThreads);
    final CountDownLatch afterInitBlocker = new CountDownLatch(1);
    final CountDownLatch allDone = new CountDownLatch(numThreads);
    for (final Runnable submittedTestRunnable : runnables) {
      threadPool.submit(new Runnable() {
        public void run() {
          allExecutorThreadsReady.countDown();
          try {
            afterInitBlocker.await();
            submittedTestRunnable.run();
          } catch (final Throwable e) {
            exceptions.add(e);
          } finally {
            allDone.countDown();
          }
        }
      });
    }
    
// wait until all threads are ready
    assertTrue("Timeout initializing threads! Perform long lasting initializations before passing runnables to assertConcurrent", allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS));
    
// start all test runners
    afterInitBlocker.countDown();
    assertTrue(message +" timeout! More than" + maxTimeoutSeconds + "seconds", allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS));
  } finally {
    threadPool.shutdownNow();
  }
  assertTrue(message + "failed with exception(s)" + exceptions, exceptions.isEmpty());
}

I explain shortly the most important parts of this test. For every runnable we create an adaptor runnable that takes care of waiting for all threads to be created, submitting failures to our exceptions list and notification when processing finished. It is very important to catch Throwable since we expect anything to happen from AssertionException, ConcurrentModificationException to Business Exceptions. The adaptor runnables are being put into a jdk provided thread pool of the same size as the number of runnables. Just after being put into the thread pool an added thread executes and waits on

afterInitBlocker.await();

while the starting thread (may) waits on

allExecutorThreadsReady.await(runnables.size() * 10, TimeUnit.MILLISECONDS);

This way it is guaranteed that when we call

afterInitBlocker.countDown();

from the starting thread that all testing threads are fully initialized.

afterInitBlocker.countDown();

notifies all waiting threads and they start testing

submittedTestRunnable.run();

By starting all testing threads this way we achieve the maximum concurrent test load possible. The inner finally block assures that our testing thread is notified whether we catch a failure or the test runs smoothly. While the test threads execute whatever they are being put up to the starting thread waits at

allDone.await(maxTimeoutSeconds, TimeUnit.SECONDS);

It is possible the await fails with an Exception when the timeout is reached. This could be caused by a deadlock, thread starvation, other processing on the testing machine or even by the timeout being too short. The thread pool is stopped in any case – even when we got a timeout. At the end we check if any thread has aborted with an exception.

Copy link

ghost commented Oct 13, 2014

What's the license on this? Is it open source I can use legally or just something I can read?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment