Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created March 11, 2014 15:05
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pchiusano/9487697 to your computer and use it in GitHub Desktop.
Save pchiusano/9487697 to your computer and use it in GitHub Desktop.
Using `mergeN` from scalaz-stream
package scalaz.stream
package examples
import scalaz.concurrent.Task
object MergeAndConnect extends App {
val lines = Process.range(0,10).map("line" + _.toString)
val cnt = new java.util.concurrent.atomic.AtomicInteger(0)
val printme: Channel[Task, String, Int] = io.resource(Task.delay(())) {
resource => Task.delay(()) } /* nothing to close */ {
resource => Task.delay {
case s => Task {
val n = cnt.incrementAndGet()
println(s"${Thread.currentThread().getName} - String = $s, Counter = $n")
Thread.sleep((math.random * 500).toInt)
n
}
}
}
/*
* Rather than using `.through`, which is deterministic, we'll just use
* regular zipping to feed the channel. We can decide later how much
* nondeterminism to allow when sequencing the actions.
*/
val actions: Process[Task,Task[Int]] =
lines.zipWith(printme)((line,chan) => chan(line))
/* Just convert the inner `Task` to a `Process`. */
val nestedActions: Process[Task, Process[Task,Int]] =
actions.map(Process.eval)
/*
* `mergeN` has the same sort of signature as `join`, but allows for
* nondeterminism, rather than just concatenating the streams in order.
*/
val concurrentActions: Process[Task,Int] =
merge.mergeN(2)(nestedActions) // allows only two 'open' streams
println { concurrentActions.runLog.run }
}
@pchiusano
Copy link
Author

And here's output:

[info] Running scalaz.stream.examples.MergeAndConnect 
pool-44-thread-1 - String = line0, Counter = 1
pool-45-thread-1 - String = line1, Counter = 2
pool-42-thread-1 - String = line2, Counter = 3
pool-45-thread-1 - String = line3, Counter = 4
pool-44-thread-1 - String = line4, Counter = 5
pool-44-thread-1 - String = line5, Counter = 6
pool-45-thread-1 - String = line6, Counter = 7
pool-44-thread-1 - String = line7, Counter = 8
pool-45-thread-1 - String = line8, Counter = 9
pool-45-thread-1 - String = line9, Counter = 10
Vector(2, 3, 1, 5, 4, 6, 7, 9, 8, 10)

If we change that to mergeN(1), this is equivalent to just doing nestedActions.flatMap(identity):

[info] Running scalaz.stream.examples.MergeAndConnect 
pool-49-thread-1 - String = line0, Counter = 1
pool-50-thread-1 - String = line1, Counter = 2
pool-47-thread-1 - String = line2, Counter = 3
pool-47-thread-1 - String = line3, Counter = 4
pool-48-thread-1 - String = line4, Counter = 5
pool-49-thread-1 - String = line5, Counter = 6
pool-49-thread-1 - String = line6, Counter = 7
pool-47-thread-1 - String = line7, Counter = 8
pool-47-thread-1 - String = line8, Counter = 9
pool-50-thread-1 - String = line9, Counter = 10
Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment