I've been playing with Scalaz-Streams (0.7.3 and 0.8) and noticed this curious behaviour which I don't understand. As an experiment I am creating multiple processes and using the output of one process as the input for the next process. In doing so, I seem to blow the stack. Initially I thought it was because I'm appending the stream with a recursive call to the function that creates the stream. But I discovered that replacing the flatMap
in the first example below with map/mergeN
and it stopped blowing the stack. Curiously though using map/mergeN
was waaaaaaaay slower than using flatMap
. So, why is flatMap
blowing the stack and why is mergeN
so slow?
import scalaz.concurrent.Task
import scalaz.stream._
object Main extends App {
def infiniteInts(streamOfInts: Process[Task, Int]): Process[Task, Int] = {
val next: Process[Task, Int] = streamOfInts.last.flatMap(nextInt)
next ++ infiniteInts(next)
}
infiniteInts(Process.emit(0)).observe(print).run.run
def nextInt(i: Int): Process[Task, Int] = Process.emit(i + 1)
def print: Sink[Task, Int] = scalaz.stream.sink.lift[Task, Int](i => Task.delay(println(s"Thread: ${Thread.currentThread()} $i")))
}
Blows up with a Stack Overflow:
...
...
...
Thread: Thread[run-main-0,5,run-main-group-0] 1470
Thread: Thread[run-main-0,5,run-main-group-0] 1471
Thread: Thread[run-main-0,5,run-main-group-0] 1472
Thread: Thread[run-main-0,5,run-main-group-0] 1473
Thread: Thread[run-main-0,5,run-main-group-0] 1474
Thread: Thread[run-main-0,5,run-main-group-0] 1475
Thread: Thread[run-main-0,5,run-main-group-0] 1476
Thread: Thread[run-main-0,5,run-main-group-0] 1477
[error] (run-main-0) java.lang.StackOverflowError
java.lang.StackOverflowError
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at scalaz.stream.Process$.fail(Process.scala:830)
at scalaz.stream.Util$.Try(Util.scala:39)
at scalaz.stream.Process$class.go$1(Process.scala:84)
at scalaz.stream.Process$class.step(Process.scala:97)
at scalaz.stream.Process$Append.step(Process.scala:641)
at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:143)
at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:139)
at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
at scalaz.stream.Util$.Try(Util.scala:38)
at scalaz.stream.Process$class.flatMap(Process.scala:46)
at scalaz.stream.Process$Emit.flatMap(Process.scala:592)
at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
at scalaz.Free.resume(Free.scala:72)
at scalaz.Free.go2$1(Free.scala:118)
at scalaz.Free.go(Free.scala:122)
at scalaz.Free.run(Free.scala:172)
at scalaz.stream.Process$$anonfun$go$1$1.apply(Process.scala:84)
at scalaz.stream.Process$$anonfun$go$1$1.apply(Process.scala:84)
at scalaz.stream.Util$.Try(Util.scala:38)
at scalaz.stream.Process$class.go$1(Process.scala:84)
at scalaz.stream.Process$class.step(Process.scala:97)
at scalaz.stream.Process$Append.step(Process.scala:641)
at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:143)
at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:139)
at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
at scalaz.stream.Util$.Try(Util.scala:38)
at scalaz.stream.Process$class.flatMap(Process.scala:46)
at scalaz.stream.Process$Emit.flatMap(Process.scala:592)
at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
at scalaz.Free.resume(Free.scala:72)
at scalaz.Free.go2$1(Free.scala:118)
at scalaz.Free.go(Free.scala:122)
at scalaz.Free.run(Free.scala:172)
...
...
...
However when I replace the flatMap
with a mergeN
- it doesn't blow the stack but runs waaaaaay slower.
import scalaz.concurrent.Task
import scalaz.stream._
object Main extends App {
def infiniteInts(streamOfInts: Process[Task, Int]): Process[Task, Int] = {
val next: Process[Task, Int] = merge.mergeN(streamOfInts.last.map(nextInt))
next ++ infiniteInts(next)
}
infiniteInts(Process.emit(0)).observe(print).run.run
def nextInt(i: Int): Process[Task, Int] = Process.emit(i + 1)
def print: Sink[Task, Int] = scalaz.stream.sink.lift[Task, Int](i => Task.delay(println(s"Thread: ${Thread.currentThread()} $i")))
}
So the question is, why does flatMap
blow the stack (when it should be trampolined) and why doesn't map/mergeN blow the stack, especially since map
is implemented in terms of flatMap
?