Created
March 11, 2014 15:05
-
-
Save pchiusano/9487697 to your computer and use it in GitHub Desktop.
Using `mergeN` from scalaz-stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalaz.stream | |
package examples | |
import scalaz.concurrent.Task | |
object MergeAndConnect extends App { | |
val lines = Process.range(0,10).map("line" + _.toString) | |
val cnt = new java.util.concurrent.atomic.AtomicInteger(0) | |
val printme: Channel[Task, String, Int] = io.resource(Task.delay(())) { | |
resource => Task.delay(()) } /* nothing to close */ { | |
resource => Task.delay { | |
case s => Task { | |
val n = cnt.incrementAndGet() | |
println(s"${Thread.currentThread().getName} - String = $s, Counter = $n") | |
Thread.sleep((math.random * 500).toInt) | |
n | |
} | |
} | |
} | |
/* | |
* Rather than using `.through`, which is deterministic, we'll just use | |
* regular zipping to feed the channel. We can decide later how much | |
* nondeterminism to allow when sequencing the actions. | |
*/ | |
val actions: Process[Task,Task[Int]] = | |
lines.zipWith(printme)((line,chan) => chan(line)) | |
/* Just convert the inner `Task` to a `Process`. */ | |
val nestedActions: Process[Task, Process[Task,Int]] = | |
actions.map(Process.eval) | |
/* | |
* `mergeN` has the same sort of signature as `join`, but allows for | |
* nondeterminism, rather than just concatenating the streams in order. | |
*/ | |
val concurrentActions: Process[Task,Int] = | |
merge.mergeN(2)(nestedActions) // allows only two 'open' streams | |
println { concurrentActions.runLog.run } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
And here's output:
If we change that to
mergeN(1)
, this is equivalent to just doingnestedActions.flatMap(identity)
: