-
-
Save viktorklang/865809 to your computer and use it in GitHub Desktop.
package akka.util | |
import java.util.concurrent.locks.ReentrantLock | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.{ TimeUnit, BlockingQueue } | |
import java.util.{ AbstractQueue, Queue, Collection, Iterator } | |
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { | |
backing match { | |
case null => throw new IllegalArgumentException("Backing Queue may not be null") | |
case b: BlockingQueue[_] => | |
require(maxCapacity > 0) | |
require(b.size() == 0) | |
require(b.remainingCapacity >= maxCapacity) | |
case b: Queue[_] => | |
require(b.size() == 0) | |
require(maxCapacity > 0) | |
} | |
protected val lock = new ReentrantLock(true) | |
private val notEmpty = lock.newCondition() | |
private val notFull = lock.newCondition() | |
def put(e: E): Unit = { //Blocks until not full | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
while (backing.size() == maxCapacity) | |
notFull.await() | |
require(backing.offer(e)) | |
notEmpty.signal() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def take(): E = { //Blocks until not empty | |
lock.lockInterruptibly() | |
try { | |
while (backing.size() == 0) | |
notEmpty.await() | |
val e = backing.poll() | |
require(e ne null) | |
notFull.signal() | |
e | |
} finally { | |
lock.unlock() | |
} | |
} | |
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
if (backing.size() == maxCapacity) false | |
else { | |
require(backing.offer(e)) //Should never fail | |
notEmpty.signal() | |
true | |
} | |
} finally { | |
lock.unlock() | |
} | |
} | |
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail | |
if (e eq null) throw new NullPointerException | |
var nanos = unit.toNanos(timeout) | |
lock.lockInterruptibly() | |
try { | |
while(backing.size() == maxCapacity) { | |
if (nanos <= 0) | |
return false | |
else | |
nanos = notFull.awaitNanos(nanos) | |
} | |
require(backing.offer(e)) //Should never fail | |
notEmpty.signal() | |
true | |
} finally { | |
lock.unlock() | |
} | |
} | |
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail | |
var nanos = unit.toNanos(timeout) | |
lock.lockInterruptibly() | |
try { | |
var result: E = null.asInstanceOf[E] | |
var hasResult = false | |
while(!hasResult) { | |
hasResult = backing.poll() match { | |
case null if nanos <= 0 => | |
result = null.asInstanceOf[E] | |
true | |
case null => | |
try { | |
nanos = notEmpty.awaitNanos(nanos) | |
} catch { | |
case ie: InterruptedException => | |
notEmpty.signal() | |
throw ie | |
} | |
false | |
case e => | |
notFull.signal() | |
result = e | |
true | |
} | |
} | |
result | |
} finally { | |
lock.unlock() | |
} | |
} | |
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null | |
lock.lock() | |
try { | |
backing.poll() match { | |
case null => null.asInstanceOf[E] | |
case e => | |
notFull.signal() | |
e | |
} | |
} finally { | |
lock.unlock | |
} | |
} | |
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
if (backing remove e) { | |
notFull.signal() | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def contains(e: AnyRef): Boolean = { | |
if (e eq null) throw new NullPointerException | |
lock.lock() | |
try { | |
backing contains e | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def clear(): Unit = { | |
lock.lock() | |
try { | |
backing.clear | |
} finally { | |
lock.unlock() | |
} | |
} | |
def remainingCapacity(): Int = { | |
lock.lock() | |
try { | |
maxCapacity - backing.size() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def size(): Int = { | |
lock.lock() | |
try { | |
backing.size() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def peek(): E = { | |
lock.lock() | |
try { | |
backing.peek() | |
} finally { | |
lock.unlock() | |
} | |
} | |
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) | |
def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { | |
if (c eq null) throw new NullPointerException | |
if (c eq this) throw new IllegalArgumentException | |
if (maxElements <= 0) 0 | |
else { | |
lock.lock() | |
try { | |
var n = 0 | |
var e: E = null.asInstanceOf[E] | |
while(n < maxElements) { | |
backing.poll() match { | |
case null => return n | |
case e => | |
c add e | |
n += 1 | |
} | |
} | |
n | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
override def containsAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
backing containsAll c | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def removeAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
if (backing.removeAll(c)) { | |
val sz = backing.size() | |
if (sz < maxCapacity) notFull.signal() | |
if (sz > 0) notEmpty.signal() //FIXME needed? | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def retainAll(c: Collection[_]): Boolean = { | |
lock.lock() | |
try { | |
if (backing.retainAll(c)) { | |
val sz = backing.size() | |
if (sz < maxCapacity) notFull.signal() //FIXME needed? | |
if (sz > 0) notEmpty.signal() | |
true | |
} else false | |
} finally { | |
lock.unlock() | |
} | |
} | |
def iterator(): Iterator[E] = { | |
lock.lock | |
try { | |
val elements = backing.toArray | |
new Iterator[E] { | |
var at = 0 | |
var last = -1 | |
def hasNext(): Boolean = at < elements.length | |
def next(): E = { | |
if (at >= elements.length) throw new NoSuchElementException | |
last = at | |
at += 1 | |
elements(last).asInstanceOf[E] | |
} | |
def remove(): Unit = { | |
if (last < 0) throw new IllegalStateException | |
val target = elements(last) | |
last = -1 //To avoid 2 subsequent removes without a next in between | |
lock.lock() | |
try { | |
val i = backing.iterator() | |
while(i.hasNext) { | |
if (i.next eq target) { | |
i.remove() | |
notFull.signal() | |
return () | |
} | |
} | |
} finally { | |
lock.unlock() | |
} | |
} | |
} | |
} finally { | |
lock.unlock | |
} | |
} | |
override def toArray(): Array[AnyRef] = { | |
lock.lock() | |
try { | |
backing.toArray | |
} finally { | |
lock.unlock() | |
} | |
} | |
override def isEmpty(): Boolean = { | |
lock.lock() | |
try { | |
backing.isEmpty() | |
} finally { | |
lock.unlock() | |
} | |
} | |
//FIXME Implement toArray[T] => Array[T] | |
} |
Hey Jed,
No closures allowed, will be used in a highly concurrent environment, so allocations when not needed must be avoided.
I've ran a few tests, but I'd really love for there to be a jsr166-test.jar so you could do something simple like:
class MyBlockingQueueImplTest extends AbstractBlockingQueueTest {
def createBounded(bounds: Int): BlockingQueue = new MyBlockingQueue(bounds)
def createUnbounded(): BlockingQueue = new MyBlockingQueue()
}
And violá!
Also, do you really want a general purpose implementation? There's a lot of work that could be avoided by for instance making the iterator not support element removal.
Hi Victor,
have you run tests against a closure using version or is this a general aversion? The JVM is particularly good at handling short lived objects after all.
BTW, my latest version has the following syntax:
protected def empty = isEmpty
protected def full = backing.size == maxCapacity
//Blocks until not full
override def put(e: E) {
notNull(e)
when(notFull) {
notEmpty.signalAndReturn(require(backing offer e))
}
}
override def take = when(notEmpty) {
notFull.signalAndReturn(notNull(backing.poll()))
}
//Tries to do it immediately, if fail return false
def offer(e: E): Boolean = {
notNull(e)
lock(notEmpty.signalIf(!full && backing.offer(e)))
}
//Tries to remove the head of the queue immediately, if fail, return null
override def poll(): E = lock(notFull.signalIfNotNull(() => backing.poll()))
Also, not a generic test but certainly adaptable:
Problem is that then you need to copy paste a lot of code, and sooner or later, that has some other deps, and then you need to pull in even more stuff.
I'd say that there's a market for a self-contained jsr166-test.jar :-)
Victor,
This is a bit large and hairy for a detailed review, particularly without inline comments. One thing I did notice is that ABQ implements a resignal if a condition.await() is interrupted. In theory this might prevent a missed signal, but then, LBQ doesn't implement this so I am not entirely sure if it is necessary.
I explored a way to make it a bit cleaner, basically extracting the bounds checking into a trait that allows cleaner calling syntax:
This is as far as I have gotten so far, I am still working out the timeout support: https://bitbucket.org/jwesleysmith/atlassian-util-scala/src/tip/src/main/scala/com/atlassian/util/scala/concurrent/Bounded.scala