Skip to content

Instantly share code, notes, and snippets.

@timperrett
Last active February 22, 2016 15:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timperrett/609de46bb647c682ce84 to your computer and use it in GitHub Desktop.
Save timperrett/609de46bb647c682ce84 to your computer and use it in GitHub Desktop.
/* discovered njoin on this thread: https://github.com/functional-streams-for-scala/fs2/issues/251 - which also cites (adjusted to match example below):
If we increase maxQueued up to 100, and `handle` emits multiple `A` values, up to 100 will be prefetched
before we block on the consumer of results. (Not actually blocking by occupying a thread, but semantically blocking.)
If we increase maxOpen to 50, then we can have 50 `item` streams processing concurrently.
*/
import scalaz.stream.{Sink,Process,sink,channel}
import scalaz.stream.nondeterminism.njoin
import scalaz.concurrent.Task
case class Foo(arb: Int)
val items: Process[Task,Foo] =
Process.emitAll(for(i <- 0 to 10000) yield Foo(i))
val rnd = new scala.util.Random
val effects: Sink[Task, Foo] =
sink.lift(foo => Task.delay {
// add some delay so the numbers obviously process out of order when printed to the console.
// never do this in your real code!!
val start = 100
val end = 400
Thread.sleep(start + rnd.nextInt( (end - start) + 1 ))
println(s"processing $foo")
})
def handle(item: Foo): Process[Task,Unit] =
Process.emit(item).to(effects)
val handled: Process[Task,Process[Task,Unit]] = items.map(handle)
val results: Process[Task,Unit] =
njoin(maxOpen = 10, maxQueued = 1)(handled)
// print all the things to the console!
results.run.run
/*
You should then see the following output (or something like it, at least):
processing Foo(2)
processing Foo(1)
processing Foo(5)
processing Foo(3)
processing Foo(4)
processing Foo(0)
processing Foo(9)
processing Foo(6)
processing Foo(7)
processing Foo(10)
processing Foo(8)
....
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment