Skip to content

Instantly share code, notes, and snippets.

@mergeconflict
Last active December 15, 2015 21:58
Show Gist options
  • Save mergeconflict/5329172 to your computer and use it in GitHub Desktop.
Save mergeconflict/5329172 to your computer and use it in GitHub Desktop.
Queue based on Hinze-Paterson 2-3 finger trees (http://www.soi.city.ac.uk/~ross/papers/FingerTree.pdf), BlockingQueue based on AtomicReference and a counting Semaphore.
package mergeconflict.collection.concurrent;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import mergeconflict.collection.immutable.Queue;
public class BlockingQueue<E> {
private final Semaphore s = new Semaphore(0);
private final AtomicReference<Queue<E>> q = new AtomicReference<Queue<E>>(Queue.<E> empty());
public void enqueue(E element) {
Queue<E> before, after;
do {
before = q.get();
after = before.enqueue(element);
} while (!q.compareAndSet(before, after));
s.release();
}
public E dequeue() throws InterruptedException {
s.acquire();
Queue<E> before, after;
do {
before = q.get();
after = before.tail();
} while (!q.compareAndSet(before, after));
return before.head();
}
}
package mergeconflict.collection.immutable;
import java.util.NoSuchElementException;
public abstract class Queue<E> {
public abstract Queue<E> enqueue(E element);
public abstract E head();
public abstract Queue<E> tail();
public abstract boolean isEmpty();
private static interface Node<E> {
Queue<E> enqueue(E element, Queue<Node<E>> spine, Node<E> rhs);
E head();
Queue<E> tail(Node<E> lhs, Queue<Node<E>> spine);
Queue<E> toTree();
static final class One<E> implements Node<E> {
public final E a;
public One(E a) {
this.a = a;
}
public Queue<E> enqueue(E element, Queue<Node<E>> spine, Node<E> rhs) {
return new Deep<E>(new Two<E>(element, a), spine, rhs);
}
public E head() {
return a;
}
public Queue<E> tail(Node<E> lhs, Queue<Node<E>> spine) {
return spine == Queue.<Node<E>> empty() ? lhs.toTree() : new Deep<E>(lhs, spine.tail(), spine.head());
}
public Queue<E> toTree() {
return new Single<E>(a);
}
}
static final class Two<E> implements Node<E> {
public final E a, b;
public Two(E a, E b) {
this.a = a;
this.b = b;
}
public Queue<E> enqueue(E element, Queue<Node<E>> spine, Node<E> rhs) {
return new Deep<E>(new Three<E>(element, a, b), spine, rhs);
}
public E head() {
return b;
}
public Queue<E> tail(Node<E> lhs, Queue<Node<E>> spine) {
return new Deep<E>(lhs, spine, new One<E>(a));
}
public Queue<E> toTree() {
return new Deep<E>(new One<E>(a), Queue.<Node<E>> empty(), new One<E>(b));
}
}
static final class Three<E> implements Node<E> {
public final E a, b, c;
public Three(E a, E b, E c) {
this.a = a;
this.b = b;
this.c = c;
}
public Queue<E> enqueue(E element, Queue<Node<E>> spine, Node<E> rhs) {
return new Deep<E>(new Four<E>(element, a, b, c), spine, rhs);
}
public E head() {
return c;
}
public Queue<E> tail(Node<E> lhs, Queue<Node<E>> spine) {
return new Deep<E>(lhs, spine, new Two<E>(a, b));
}
public Queue<E> toTree() {
return new Deep<E>(new One<E>(a), Queue.<Node<E>> empty(), new Two<E>(b, c));
}
}
static final class Four<E> implements Node<E> {
public final E a, b, c, d;
public Four(E a, E b, E c, E d) {
this.a = a;
this.b = b;
this.c = c;
this.d = d;
}
public Queue<E> enqueue(E element, Queue<Node<E>> spine, Node<E> rhs) {
return new Deep<E>(new Two<E>(element, a), spine.enqueue(new Three<E>(b, c, d)), rhs);
}
public E head() {
return d;
}
public Queue<E> tail(Node<E> lhs, Queue<Node<E>> spine) {
return new Deep<E>(lhs, spine, new Three<E>(a, b, c));
}
public Queue<E> toTree() {
return new Deep<E>(new One<E>(a), Queue.<Node<E>> empty(), new Three<E>(b, c, d));
}
}
}
private static final class Empty<E> extends Queue<E> {
public Queue<E> enqueue(E element) {
return new Single<E>(element);
}
public E head() {
throw new NoSuchElementException();
}
public Queue<E> tail() {
throw new NoSuchElementException();
}
public boolean isEmpty() {
return true;
}
}
private static final class Single<E> extends Queue<E> {
public final E a;
public Single(E a) {
this.a = a;
}
public Queue<E> enqueue(E element) {
return new Deep<E>(new Node.One<E>(element), Queue.<Node<E>> empty(), new Node.One<E>(a));
}
public E head() {
return a;
}
public Queue<E> tail() {
return empty();
}
public boolean isEmpty() {
return false;
}
}
private static final class Deep<E> extends Queue<E> {
public final Node<E> lhs, rhs;
public final Queue<Node<E>> spine;
public Deep(Node<E> lhs, Queue<Node<E>> spine, Node<E> rhs) {
this.lhs = lhs;
this.spine = spine;
this.rhs = rhs;
}
public Queue<E> enqueue(E element) {
return lhs.enqueue(element, spine, rhs);
}
public E head() {
return rhs.head();
}
public Queue<E> tail() {
return rhs.tail(lhs, spine);
}
public boolean isEmpty() {
return false;
}
}
private static final Queue<Object> EMPTY = new Empty<Object>();
@SuppressWarnings("unchecked")
public static <E> Queue<E> empty() {
return (Queue<E>) EMPTY;
}
}
@mjpt777
Copy link

mjpt777 commented Apr 9, 2013

I've a suite of tests I can run it with next week. However on first blush the code does not look like it will be that fast. Lots of indirection and CAS work costs. The most significant issue will be the operating system scheduling costs that most blocking queues face. This is usually better addressed with some thread affinity and OS tuning if you really must use locks or semaphores.

@viktorklang
Copy link

Yep, everything will turn megamorphic...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment