Skip to content

Instantly share code, notes, and snippets.

@grignaak
Created October 4, 2011 08:51
Show Gist options
  • Save grignaak/1261175 to your computer and use it in GitHub Desktop.
Save grignaak/1261175 to your computer and use it in GitHub Desktop.
Wait free vs SynchronizedDeque vs Partially Synchronized Queues
// The wait free algorithm is at the very bottom of the page.
package performance;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.experimental.theories.DataPoint;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
@RunWith(Theories.class)
public class CompareQueues {
private static int OP_COUNT = 1000000;
public static @DataPoint QueueRunner blocking = new SimpleQueueRunner(new LinkedBlockingQueue<Integer>());
public static @DataPoint QueueRunner concurrent = new SimpleQueueRunner(new ConcurrentLinkedQueue<Integer>());
public static @DataPoint QueueRunner array = new SynchronizedQueueRunner(new ArrayDeque<Integer>());
public static @DataPoint QueueRunner chunkConcurrent = new WaitFreeQueueRunner(new ConcurrentLinkedQueue<Integer>());
@Theory public void
compare(QueueRunner queue) throws InterruptedException {
long start = System.nanoTime();
for (int i = 0; i < OP_COUNT; i++) {
queue.add(i);
}
queue.await();
long end = System.nanoTime();
long runningTime = end - start;
double timePerOp = runningTime / OP_COUNT; // includes thread overhead
System.out.println(queue + " : " + timePerOp + " nanos / op");
queue.shutdown();
}
private static abstract class QueueRunner {
protected final Queue<Integer> queue;
private final ExecutorService exec = Executors.newFixedThreadPool(2);
private Accounting accounting = new Accounting();
private Runnable processor = new Runnable() {
public void run() {
process(accounting);
}
};
QueueRunner(Queue<Integer> queue) { this.queue = queue; }
public void await() throws InterruptedException { accounting.await(); }
public void shutdown() throws InterruptedException {
exec.shutdownNow();
while (!exec.isTerminated())
exec.awaitTermination(1, TimeUnit.SECONDS);
}
protected void exec() {
exec.execute(processor);
}
public String toString() {
return queue.getClass().getSimpleName();
}
abstract void add(Integer item);
protected abstract void process(Accounting accounting);
}
private static class Accounting {
private final CountDownLatch done = new CountDownLatch(OP_COUNT);
private Integer expected = 0;
public void await() throws InterruptedException { done.await(); }
public void apply(Integer i) {
if (!expected.equals(i)) throw new RuntimeException();
expected++;
done.countDown();
}
}
public static class SynchronizedQueueRunner extends QueueRunner {
public SynchronizedQueueRunner(Queue<Integer> queue) {
super(queue);
}
public synchronized void add(Integer i) {
queue.add(i);
exec();
}
protected synchronized void process(Accounting accounting) {
accounting.apply(queue.remove());
}
public String toString() {
return "synchronized " + super.toString();
}
}
public static class SimpleQueueRunner extends QueueRunner {
SimpleQueueRunner(Queue<Integer> queue) {
super(queue);
}
public void add(Integer i) {
queue.add(i);
exec();
}
// NOTE: This is synchronized because accounting.apply is not!
protected synchronized void process(Accounting accounting) {
accounting.apply(queue.remove());
}
}
public static class WaitFreeQueueRunner extends QueueRunner {
private volatile boolean enqueued = false; // NOTE: this flag is just an optimization
private AtomicBoolean lock = new AtomicBoolean();
private static int MAX_ITEMS = 100;
public WaitFreeQueueRunner(Queue<Integer> queue) {
super(queue);
}
public void add(Integer i) {
queue.add(i);
if (!enqueued) {
enqueued = true;
exec();
}
}
protected void process(Accounting accounting) {
if (lock.getAndSet(true)) return;
for (int count = 0; !queue.isEmpty() && count < MAX_ITEMS; count++) {
// This represents long work where everything must be ordered
accounting.apply(queue.remove());
}
lock.set(false);
enqueued = false; // NOTE! Important that this comes after unlocking
if (!queue.isEmpty() && !enqueued) {
enqueued = true;
exec();
}
}
public String toString() {
return "wait-free with " + super.toString();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment