Skip to content

Instantly share code, notes, and snippets.

@gwenzek
Last active August 29, 2015 14:05
Show Gist options
  • Save gwenzek/649444d583d8dc5e1596 to your computer and use it in GitHub Desktop.
Save gwenzek/649444d583d8dc5e1596 to your computer and use it in GitHub Desktop.
Producter and consumer in scala
import java.util.concurrent.ArrayBlockingQueue
/**
* in answer to: http://stackoverflow.com/questions/25344130/modelling-producer-consumer-semantics-with-typeclasses/25448892#25448892
* adapted from: http://matt.might.net/articles/pipelined-nonblocking-extensible-web-server-with-coroutines/
*/
trait Coroutine extends Runnable {
def start() {
val myThread = new Thread(this)
myThread.start()
}
}
trait Producer[O] extends Coroutine {
private val outputs = new ArrayBlockingQueue[O](1024)
protected def put(output: O): Unit = outputs.put(output)
def next(): O = outputs.take()
def ==>[I >: O](consumer: Consumer[I]): Coroutine = {
val that = this
new Coroutine {
def run() {
while (true) {
val o = that.next(); consumer.accept(o)
}
}
override def start() {
that.start()
consumer.start()
super.start()
}
}
}
def ==>[I >: O, O2] (t : Transducer[I,O2]): Producer[O2] = {
val that = this ;
new Producer[O2] {
def run () {
while (true) {
val o = that.next()
t.accept(o)
put(t.next())
}
}
override def start () {
that.start() ;
t.start() ;
super.start() ;
}
}
}
}
trait Consumer[I] extends Coroutine {
private val inputs = new ArrayBlockingQueue[I](1024)
def accept(input: I): Unit = inputs.put(input)
protected def get(): I = inputs.take()
}
trait Transducer[I,O] extends Consumer[I] with Producer[O]
object Producer {
def apply[O](coll: Iterable[O]) = new Producer[O] {
def run(): Unit = {
val it = coll.toIterator
while(it.hasNext) put(it.next())
}
}
}
object Consumer {
def apply[I](f: I => Unit) = new Consumer[I] {
def run(): Unit = while(true) f(get())
}
}
object Transducer {
def apply[I, O](f: I => O) = new Transducer[I, O]{
def run(): Unit = while(true) put(f(get()))
}
}
case class IntProducer(zero: Int) extends Producer[Int] {
def run(): Unit = {
var i = zero
while (true) {
put(i); i += 1
}
}
}
object FloatConverter extends Transducer[Int, Float]{
def run(): Unit = {
while (true) { put(get() * 1.0f) }
}
}
object Printer extends Consumer[Any] {
def run(): Unit = {
while (true) {
println(get())
}
}
}
object Main extends App {
// val pip = IntProducer(0) ==> FloatConverter ==> Printer
val pip = Producer(1 to 10) ==> Transducer((x: Int) => x.toFloat) ==> Printer
pip.start()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment