Skip to content

Instantly share code, notes, and snippets.

@ncornwell
Last active August 29, 2015 13:56
Show Gist options
  • Save ncornwell/9318019 to your computer and use it in GitHub Desktop.
Save ncornwell/9318019 to your computer and use it in GitHub Desktop.
Combining two different streams at different frequencies using scalaz-stream
import scalaz.stream._
object Main extends App {
val (qInt, srcInt) = async.queue[Int]
val (qString, srcString) = async.queue[String]
val repsInt = 5
val repsString = 5
new Thread(new Runnable {
def run(): Unit = {
(0 until repsInt).foreach(_ => {
Thread.sleep(100)
val result = (math.random * 100).toInt
qInt.enqueue(result)
})
qInt.close
}
}).start()
new Thread(new Runnable {
def run(): Unit = {
(0 until repsString).foreach(_ => {
Thread.sleep(300)
val result = (math.random * 100).toInt
qString.enqueue("String Rep:" + result.toString)
})
qString.close
}
}).start()
val result = srcInt
.either(srcString)
.map(e => e.fold(i => "Int Rep:" + i.toString, s => s)) //arbitrary method to combine streams
.runLog
.run
assert(result.length == (repsInt + repsString))
assert(result.count(_.startsWith("String")) == repsString)
assert(result.count(_.startsWith("Int")) == repsInt)
}
@pchiusano
Copy link

You can do the two consumers with:

import scala.concurrent.duration._

val produceInts = 
  Process.awakeEvery(100 milliseconds).map(_ => qInt.enqueue(math.random * 100)).take(repsInt) ++
  Process.suspend { qInt.close }
val produceStrings = 
  Process.awakeEvery(300 milliseconds).map(_ => ...).take(repsString) ++
  Process.suspend { qString.close }

produceInts.run.runAsync { _ => () }
produceStrings.run.runAsync { _ => () }

The first run returns Task, the second runAsync takes a callback of type Throwable \/ A => Unit. The call to runAsync will be nonblocking as long as the process itself is nonblocking, which in this case it is since it's built using awakeEvery.

Another thing you can do in this instance, rather than running the two producers, is to do:

(produceInts.drain ++ produceStrings.drain).merge(srcInt.either(srcString).map(e => ...)

So, a.drain.merge(b) will in general run a just for its effects concurrently with the values of b being emitted.

Also, the API for queue may change slightly in the near future - we are going to have enqueue return a Task rather than having the side effect directly, which will be useful for implementing a 'nonblocking' finite queue. Of course, nothing would stop you from running that Task immediately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment