Skip to content

Instantly share code, notes, and snippets.

@marcel
Created March 24, 2011 22:43
Show Gist options
  • Save marcel/886048 to your computer and use it in GitHub Desktop.
Save marcel/886048 to your computer and use it in GitHub Desktop.
Abstracts away the common pattern of producing items into a queue that are consumed concurrently by a pool of workers.
import java.util.concurrent.{BlockingQueue, ArrayBlockingQueue, CountDownLatch, Executors, TimeUnit}
/*
Abstracts away the common pattern of producing items into a queue that are
consumed concurrently by a pool of workers.
*/
class ConcurrentBlockingQueueConsumer[T](queue: BlockingQueue[T], producer: Iterator[T], concurrencyLevel: Int) {
lazy val stopLatch = new CountDownLatch(1)
lazy val pool = Executors.newFixedThreadPool(concurrencyLevel)
def run()(consumer: (T) => Unit) {
val producerThread = ProducerThread(producer)
producerThread.start()
(1 to concurrencyLevel).foreach { _ =>
pool.submit(Consumer(consumer))
}
producerThread.join()
stopLatch.await()
pool.shutdown()
pool.awaitTermination(500, TimeUnit.MILLISECONDS)
}
private def ProducerThread(producer: Iterator[T]) = {
new Thread(new Runnable {
def run() {
for (item <- producer) {
queue.put(item)
}
stopLatch.countDown()
}
})
}
private def Consumer(consumer: (T) => Unit) = {
new Runnable {
def run() {
try {
while (stopLatch.getCount() > 0 || queue.size > 0) {
consumer(queue.poll(500, TimeUnit.MILLISECONDS))
}
} catch {
case e: InterruptedException =>
}
}
}
}
}
object ConcurrentBlockingQueueConsumer {
def apply[T](queue: ArrayBlockingQueue[T], producer: Iterator[T])(consumer: (T) => Unit) {
val concurrencyLevel = queue.size + queue.remainingCapacity()
ConcurrentBlockingQueueConsumer(queue, producer, concurrencyLevel)(consumer)
}
def apply[T](queue: BlockingQueue[T], producer: Iterator[T], concurrencyLevel: Int)(consumer: (T) => Unit) {
new ConcurrentBlockingQueueConsumer(queue, producer, concurrencyLevel).run()(consumer)
}
}
object ConcurrentBlockingQueueConsumerExample {
def main(args: Array[String]) {
val queue = new ArrayBlockingQueue[String](4)
val producer = io.Source.fromInputStream(System.in).getLines
ConcurrentBlockingQueueConsumer[String](queue, producer) { item =>
println(item.stripLineEnd)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment