Skip to content

Instantly share code, notes, and snippets.

@caoilte
Last active June 13, 2016 13:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save caoilte/3963b9158a55ca333d0a5018c95dceca to your computer and use it in GitHub Desktop.
Save caoilte/3963b9158a55ca333d0a5018c95dceca to your computer and use it in GitHub Desktop.
scalaz-stream example of problem for Stack Overflow question
import java.util.concurrent.Executors
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream._
import scalaz.stream.async.mutable.Queue
object Processes {
val source: Process[Task, String] = Process.emitAll(0 until 10).map(_.toString())
val slowProcessor1: Channel[Task, String, String] =
Utils.simulatedWork(100, data => data + "1")
val slowProcessor2: Channel[Task, String, String] =
Utils.simulatedWork(100, data => data + "2")
val sink: Sink[Task, String] = Process.constant { a:String => Task{ println(a)}}
}
object Simple {
import Processes._
val batchJob: Task[Unit] =
(source through slowProcessor1 through slowProcessor2 to sink).run
}
object WithQueue {
import Processes._
val buffer:Queue[String] = async.boundedQueue[String](5, false)
val enqueueOnBuffer: Channel[Task, String, Unit] = channel.lift(buffer.enqueueOne)
val slowProcess1AndEnqueueOnBuffer: Process[Task, Unit] =
(source through slowProcessor1 through enqueueOnBuffer).onComplete(Process.eval(buffer.close))
val batchJob: Task[Seq[Unit]] = Task.gatherUnordered(Seq(
slowProcess1AndEnqueueOnBuffer.run,
(buffer.dequeue through slowProcessor2 to sink).run
))
}
object Utils {
// Initialise really large ExecutorService to ensure Thread.sleep doesn't block work
implicit val es = Executors.newFixedThreadPool(15, Strategy.DefaultDaemonThreadFactory)
def simulatedWork(millis: Long, work: String => String): Channel[Task, String, String] = {
val delayedTask: String => Task[String] = input => Task {
Thread.sleep(millis)
work(input)
}
channel.lift(delayedTask)
}
def runTaskTimed(task: Task[_]): Unit = {
val s = System.nanoTime
task.run
println("time: " + (System.nanoTime - s) / 1e6 + "ms")
}
}
object Demo extends App {
println("First I will run the simple batch job")
Utils.runTaskTimed(Simple.batchJob)
println()
println("Now I will run the batch job utilising a queue")
Utils.runTaskTimed(WithQueue.batchJob)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment