Skip to content

Instantly share code, notes, and snippets.

@viktorklang
Created March 11, 2011 12:07
Show Gist options
  • Save viktorklang/865809 to your computer and use it in GitHub Desktop.
Save viktorklang/865809 to your computer and use it in GitHub Desktop.
Just a draft
package akka.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
backing match {
case null => throw new IllegalArgumentException("Backing Queue may not be null")
case b: BlockingQueue[_] =>
require(maxCapacity > 0)
require(b.size() == 0)
require(b.remainingCapacity >= maxCapacity)
case b: Queue[_] =>
require(b.size() == 0)
require(maxCapacity > 0)
}
protected val lock = new ReentrantLock(true)
private val notEmpty = lock.newCondition()
private val notFull = lock.newCondition()
def put(e: E): Unit = { //Blocks until not full
if (e eq null) throw new NullPointerException
lock.lock()
try {
while (backing.size() == maxCapacity)
notFull.await()
require(backing.offer(e))
notEmpty.signal()
} finally {
lock.unlock()
}
}
def take(): E = { //Blocks until not empty
lock.lockInterruptibly()
try {
while (backing.size() == 0)
notEmpty.await()
val e = backing.poll()
require(e ne null)
notFull.signal()
e
} finally {
lock.unlock()
}
}
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
if (e eq null) throw new NullPointerException
lock.lock()
try {
if (backing.size() == maxCapacity) false
else {
require(backing.offer(e)) //Should never fail
notEmpty.signal()
true
}
} finally {
lock.unlock()
}
}
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
if (e eq null) throw new NullPointerException
var nanos = unit.toNanos(timeout)
lock.lockInterruptibly()
try {
while(backing.size() == maxCapacity) {
if (nanos <= 0)
return false
else
nanos = notFull.awaitNanos(nanos)
}
require(backing.offer(e)) //Should never fail
notEmpty.signal()
true
} finally {
lock.unlock()
}
}
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
var nanos = unit.toNanos(timeout)
lock.lockInterruptibly()
try {
var result: E = null.asInstanceOf[E]
var hasResult = false
while(!hasResult) {
hasResult = backing.poll() match {
case null if nanos <= 0 =>
result = null.asInstanceOf[E]
true
case null =>
try {
nanos = notEmpty.awaitNanos(nanos)
} catch {
case ie: InterruptedException =>
notEmpty.signal()
throw ie
}
false
case e =>
notFull.signal()
result = e
true
}
}
result
} finally {
lock.unlock()
}
}
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
lock.lock()
try {
backing.poll() match {
case null => null.asInstanceOf[E]
case e =>
notFull.signal()
e
}
} finally {
lock.unlock
}
}
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
if (e eq null) throw new NullPointerException
lock.lock()
try {
if (backing remove e) {
notFull.signal()
true
} else false
} finally {
lock.unlock()
}
}
override def contains(e: AnyRef): Boolean = {
if (e eq null) throw new NullPointerException
lock.lock()
try {
backing contains e
} finally {
lock.unlock()
}
}
override def clear(): Unit = {
lock.lock()
try {
backing.clear
} finally {
lock.unlock()
}
}
def remainingCapacity(): Int = {
lock.lock()
try {
maxCapacity - backing.size()
} finally {
lock.unlock()
}
}
def size(): Int = {
lock.lock()
try {
backing.size()
} finally {
lock.unlock()
}
}
def peek(): E = {
lock.lock()
try {
backing.peek()
} finally {
lock.unlock()
}
}
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)
def drainTo(c: Collection[_ >: E], maxElements: Int): Int = {
if (c eq null) throw new NullPointerException
if (c eq this) throw new IllegalArgumentException
if (maxElements <= 0) 0
else {
lock.lock()
try {
var n = 0
var e: E = null.asInstanceOf[E]
while(n < maxElements) {
backing.poll() match {
case null => return n
case e =>
c add e
n += 1
}
}
n
} finally {
lock.unlock()
}
}
}
override def containsAll(c: Collection[_]): Boolean = {
lock.lock()
try {
backing containsAll c
} finally {
lock.unlock()
}
}
override def removeAll(c: Collection[_]): Boolean = {
lock.lock()
try {
if (backing.removeAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal()
if (sz > 0) notEmpty.signal() //FIXME needed?
true
} else false
} finally {
lock.unlock()
}
}
override def retainAll(c: Collection[_]): Boolean = {
lock.lock()
try {
if (backing.retainAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal() //FIXME needed?
if (sz > 0) notEmpty.signal()
true
} else false
} finally {
lock.unlock()
}
}
def iterator(): Iterator[E] = {
lock.lock
try {
val elements = backing.toArray
new Iterator[E] {
var at = 0
var last = -1
def hasNext(): Boolean = at < elements.length
def next(): E = {
if (at >= elements.length) throw new NoSuchElementException
last = at
at += 1
elements(last).asInstanceOf[E]
}
def remove(): Unit = {
if (last < 0) throw new IllegalStateException
val target = elements(last)
last = -1 //To avoid 2 subsequent removes without a next in between
lock.lock()
try {
val i = backing.iterator()
while(i.hasNext) {
if (i.next eq target) {
i.remove()
notFull.signal()
return ()
}
}
} finally {
lock.unlock()
}
}
}
} finally {
lock.unlock
}
}
override def toArray(): Array[AnyRef] = {
lock.lock()
try {
backing.toArray
} finally {
lock.unlock()
}
}
override def isEmpty(): Boolean = {
lock.lock()
try {
backing.isEmpty()
} finally {
lock.unlock()
}
}
//FIXME Implement toArray[T] => Array[T]
}
@jedws
Copy link

jedws commented Mar 13, 2011

Victor,

This is a bit large and hairy for a detailed review, particularly without inline comments. One thing I did notice is that ABQ implements a resignal if a condition.await() is interrupted. In theory this might prevent a missed signal, but then, LBQ doesn't implement this so I am not entirely sure if it is necessary.

I explored a way to make it a bit cleaner, basically extracting the bounds checking into a trait that allows cleaner calling syntax:

def put(e: E): Unit = { //Blocks until not full
  notNull(e)
  lock(
    whenNotFull(() =>
      require(backing.offer(e))
    )
  )
}

def take(): E = {
  lockInterruptibly(
    whenNotEmpty(() =>
      notNull(backing.poll())
    )
  )
}

This is as far as I have gotten so far, I am still working out the timeout support: https://bitbucket.org/jwesleysmith/atlassian-util-scala/src/tip/src/main/scala/com/atlassian/util/scala/concurrent/Bounded.scala

@viktorklang
Copy link
Author

Hey Jed,

No closures allowed, will be used in a highly concurrent environment, so allocations when not needed must be avoided.

I've ran a few tests, but I'd really love for there to be a jsr166-test.jar so you could do something simple like:

class MyBlockingQueueImplTest extends AbstractBlockingQueueTest {
def createBounded(bounds: Int): BlockingQueue = new MyBlockingQueue(bounds)
def createUnbounded(): BlockingQueue = new MyBlockingQueue()
}

And violá!

@jedws
Copy link

jedws commented Mar 14, 2011

Also, do you really want a general purpose implementation? There's a lot of work that could be avoided by for instance making the iterator not support element removal.

@jedws
Copy link

jedws commented Mar 14, 2011

Hi Victor,

have you run tests against a closure using version or is this a general aversion? The JVM is particularly good at handling short lived objects after all.

BTW, my latest version has the following syntax:

protected def empty = isEmpty
protected def full = backing.size == maxCapacity

//Blocks until not full
override def put(e: E) {
  notNull(e)
  when(notFull) {
    notEmpty.signalAndReturn(require(backing offer e))
  }
}

override def take = when(notEmpty) {
  notFull.signalAndReturn(notNull(backing.poll()))
}

//Tries to do it immediately, if fail return false
def offer(e: E): Boolean = {
  notNull(e)
  lock(notEmpty.signalIf(!full && backing.offer(e)))
}

//Tries to remove the head of the queue immediately, if fail, return null
override def poll(): E = lock(notFull.signalIfNotNull(() => backing.poll()))

@jedws
Copy link

jedws commented Mar 14, 2011

@viktorklang
Copy link
Author

Problem is that then you need to copy paste a lot of code, and sooner or later, that has some other deps, and then you need to pull in even more stuff.

I'd say that there's a market for a self-contained jsr166-test.jar :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment