Skip to content

Instantly share code, notes, and snippets.

Created December 20, 2012 20:52
Show Gist options
  • Save anonymous/4348460 to your computer and use it in GitHub Desktop.
Save anonymous/4348460 to your computer and use it in GitHub Desktop.
Thread-safe summing buffer using only ArrayBlockingQueue and Monoids.
// Adds an item to the buffer and returns None, or fills the buffer and returns the sum
def addOrSum(item: V): Option[V] = {
@tailrec
def offerAll(items: List[V], acc: Option[V] = None): Option[V] =
items match {
case Nil => acc
case head :: tail => {
if(!queue.offer(head)) {
// Queue is full
val toSum = ListBuffer[V]()
queue.drainTo(toSum)
// Add everything up and get the new acc:
val newAcc = Monoid.plus(acc, Some(Monoid.sum(items)))
// Now continue to offer:
offerAll(tail, newAcc)
}
else {
// We added and we should just recursively call:
offerAll(tail, acc)
}
}
}
// Start offering everything:
offerAll(List(item))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment