Skip to content

Instantly share code, notes, and snippets.



Last active Jun 13, 2016
What would you like to do?
scalaz-stream example of problem for Stack Overflow question
import java.util.concurrent.Executors
import scalaz.concurrent.{Strategy, Task}
object Processes {
val source: Process[Task, String] = Process.emitAll(0 until 10).map(_.toString())
val slowProcessor1: Channel[Task, String, String] =
Utils.simulatedWork(100, data => data + "1")
val slowProcessor2: Channel[Task, String, String] =
Utils.simulatedWork(100, data => data + "2")
val sink: Sink[Task, String] = Process.constant { a:String => Task{ println(a)}}
object Simple {
import Processes._
val batchJob: Task[Unit] =
(source through slowProcessor1 through slowProcessor2 to sink).run
object WithQueue {
import Processes._
val buffer:Queue[String] = async.boundedQueue[String](5, false)
val enqueueOnBuffer: Channel[Task, String, Unit] = channel.lift(buffer.enqueueOne)
val slowProcess1AndEnqueueOnBuffer: Process[Task, Unit] =
(source through slowProcessor1 through enqueueOnBuffer).onComplete(Process.eval(buffer.close))
val batchJob: Task[Seq[Unit]] = Task.gatherUnordered(Seq(,
(buffer.dequeue through slowProcessor2 to sink).run
object Utils {
// Initialise really large ExecutorService to ensure Thread.sleep doesn't block work
implicit val es = Executors.newFixedThreadPool(15, Strategy.DefaultDaemonThreadFactory)
def simulatedWork(millis: Long, work: String => String): Channel[Task, String, String] = {
val delayedTask: String => Task[String] = input => Task {
def runTaskTimed(task: Task[_]): Unit = {
val s = System.nanoTime
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
object Demo extends App {
println("First I will run the simple batch job")
println("Now I will run the batch job utilising a queue")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment