Last active
January 31, 2016 06:38
-
-
Save soulmachine/e8172a0fbb55c72adb4c to your computer and use it in GitHub Desktop.
My implementations of BlockingQueue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
// Reference: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html | |
public class MyArrayBlockingQueue<E> { | |
final ReentrantLock lock = new ReentrantLock(); | |
final Condition notFull = lock.newCondition(); | |
final Condition notEmpty = lock.newCondition(); | |
final Object[] items; | |
int putIndex, takeIndex, count; | |
public MyArrayBlockingQueue(int capacity) { | |
if (capacity <= 0) throw new IllegalArgumentException(); | |
this.items = new Object[capacity]; | |
} | |
/** | |
* Inserts the specified element at the tail of this queue, waiting | |
* for space to become available if the queue is full. | |
* | |
* @throws InterruptedException {@inheritDoc} | |
* @throws NullPointerException {@inheritDoc} | |
*/ | |
public void put(E e) throws InterruptedException { | |
if (e == null) throw new NullPointerException(); | |
final ReentrantLock lock = this.lock; | |
lock.lockInterruptibly(); | |
try { | |
while (count == items.length) | |
notFull.await(); | |
enqueue(e); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Retrieves and removes the head of this queue, waiting if necessary | |
* until an element becomes available. | |
* | |
* @return the head of this queue | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
public E take() throws InterruptedException { | |
final ReentrantLock lock = this.lock; | |
lock.lockInterruptibly(); | |
try { | |
while (count == 0) | |
notEmpty.await(); | |
return dequeue(); | |
} finally { | |
lock.unlock(); | |
} | |
} | |
/** | |
* Inserts element at current put position, advances, and signals. | |
* Call only when holding lock. | |
*/ | |
private void enqueue(E x) { | |
// assert lock.getHoldCount() == 1; | |
// assert items[putIndex] == null; | |
final Object[] items = this.items; | |
items[putIndex] = x; | |
if (++putIndex == items.length) | |
putIndex = 0; | |
count++; | |
notEmpty.signal(); | |
} | |
/** | |
* Extracts element at current take position, advances, and signals. | |
* Call only when holding lock. | |
*/ | |
private E dequeue() { | |
// assert lock.getHoldCount() == 1; | |
// assert items[takeIndex] != null; | |
final Object[] items = this.items; | |
@SuppressWarnings("unchecked") | |
E x = (E) items[takeIndex]; | |
items[takeIndex] = null; | |
if (++takeIndex == items.length) | |
takeIndex = 0; | |
count--; | |
notFull.signal(); | |
return x; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.locks.Condition; | |
import java.util.concurrent.locks.ReentrantLock; | |
public final class MyLinkedBlockingQueue<E> { | |
/** | |
* Linked list node class | |
*/ | |
static class Node<E> { | |
E item; | |
/** | |
* One of: | |
* - the real successor Node | |
* - this Node, meaning the successor is head.next | |
* - null, meaning there is no successor (this is the last node) | |
*/ | |
Node<E> next; | |
Node(E x) { item = x; } | |
} | |
/** The capacity bound, or Integer.MAX_VALUE if none */ | |
private final int capacity; | |
/** Current number of elements */ | |
private final AtomicInteger count = new AtomicInteger(); | |
/** | |
* Head of linked list. | |
* Invariant: head.item == null | |
*/ | |
transient Node<E> head; | |
/** | |
* Tail of linked list. | |
* Invariant: last.next == null | |
*/ | |
private transient Node<E> last; | |
/** Lock held by take, poll, etc */ | |
private final ReentrantLock takeLock = new ReentrantLock(); | |
/** Wait queue for waiting takes */ | |
private final Condition notEmpty = takeLock.newCondition(); | |
/** Lock held by put, offer, etc */ | |
private final ReentrantLock putLock = new ReentrantLock(); | |
/** Wait queue for waiting puts */ | |
private final Condition notFull = putLock.newCondition(); | |
/** | |
* Signals a waiting take. Called only from put/offer (which do not | |
* otherwise ordinarily lock takeLock.) | |
*/ | |
private void signalNotEmpty() { | |
final ReentrantLock takeLock = this.takeLock; | |
takeLock.lock(); | |
try { | |
notEmpty.signal(); | |
} finally { | |
takeLock.unlock(); | |
} | |
} | |
/** | |
* Signals a waiting put. Called only from take/poll. | |
*/ | |
private void signalNotFull() { | |
final ReentrantLock putLock = this.putLock; | |
putLock.lock(); | |
try { | |
notFull.signal(); | |
} finally { | |
putLock.unlock(); | |
} | |
} | |
/** | |
* Links node at end of queue. | |
* | |
* @param node the node | |
*/ | |
private void enqueue(Node<E> node) { | |
// assert putLock.isHeldByCurrentThread(); | |
// assert last.next == null; | |
last = last.next = node; | |
} | |
/** | |
* Removes a node from head of queue. | |
* | |
* @return the node | |
*/ | |
private E dequeue() { | |
// assert takeLock.isHeldByCurrentThread(); | |
// assert head.item == null; | |
Node<E> h = head; | |
Node<E> first = h.next; | |
h.next = h; // help GC | |
head = first; | |
E x = first.item; | |
first.item = null; | |
return x; | |
} | |
public MyLinkedBlockingQueue(int capacity) { | |
if (capacity <= 0) throw new IllegalArgumentException(); | |
this.capacity = capacity; | |
last = head = new Node<E>(null); | |
} | |
/** | |
* Inserts the specified element at the tail of this queue, waiting if | |
* necessary for space to become available. | |
* | |
* @throws InterruptedException {@inheritDoc} | |
* @throws NullPointerException {@inheritDoc} | |
*/ | |
public void put(E e) throws InterruptedException { | |
if (e == null) throw new NullPointerException(); | |
// Note: convention in all put/take/etc is to preset local var | |
// holding count negative to indicate failure unless set. | |
int c = -1; | |
Node<E> node = new Node<E>(e); | |
final ReentrantLock putLock = this.putLock; | |
final AtomicInteger count = this.count; | |
putLock.lockInterruptibly(); | |
try { | |
/* | |
* Note that count is used in wait guard even though it is | |
* not protected by lock. This works because count can | |
* only decrease at this point (all other puts are shut | |
* out by lock), and we (or some other waiting put) are | |
* signalled if it ever changes from capacity. Similarly | |
* for all other uses of count in other wait guards. | |
*/ | |
while (count.get() == capacity) { | |
notFull.await(); | |
} | |
enqueue(node); | |
c = count.getAndIncrement(); | |
if (c + 1 < capacity) | |
notFull.signal(); | |
} finally { | |
putLock.unlock(); | |
} | |
if (c == 0) | |
signalNotEmpty(); | |
} | |
/** | |
* Retrieves and removes the head of this queue, waiting if necessary | |
* until an element becomes available. | |
* | |
* @return the head of this queue | |
* @throws InterruptedException if interrupted while waiting | |
*/ | |
public E take() throws InterruptedException { | |
E x; | |
int c = -1; | |
final AtomicInteger count = this.count; | |
final ReentrantLock takeLock = this.takeLock; | |
takeLock.lockInterruptibly(); | |
try { | |
while (count.get() == 0) { | |
notEmpty.await(); | |
} | |
x = dequeue(); | |
c = count.getAndDecrement(); | |
if (c > 1) | |
notEmpty.signal(); | |
} finally { | |
takeLock.unlock(); | |
} | |
if (c == capacity) | |
signalNotFull(); | |
return x; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment