Skip to content

Instantly share code, notes, and snippets.

@cwmyers
Last active May 12, 2016 08:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cwmyers/71c442042eb849fe4fcdee0fa79938a2 to your computer and use it in GitHub Desktop.
Save cwmyers/71c442042eb849fe4fcdee0fa79938a2 to your computer and use it in GitHub Desktop.
FlatMap is blowing the stack

I've been playing with Scalaz-Streams (0.7.3 and 0.8) and noticed this curious behaviour which I don't understand. As an experiment I am creating multiple processes and using the output of one process as the input for the next process. In doing so, I seem to blow the stack. Initially I thought it was because I'm appending the stream with a recursive call to the function that creates the stream. But I discovered that replacing the flatMap in the first example below with map/mergeN and it stopped blowing the stack. Curiously though using map/mergeN was waaaaaaaay slower than using flatMap. So, why is flatMap blowing the stack and why is mergeN so slow?

import scalaz.concurrent.Task
import scalaz.stream._


object Main extends App {

  def infiniteInts(streamOfInts: Process[Task, Int]): Process[Task, Int] = {
    val next: Process[Task, Int] = streamOfInts.last.flatMap(nextInt)
    next ++ infiniteInts(next)
  }

  infiniteInts(Process.emit(0)).observe(print).run.run

  def nextInt(i: Int): Process[Task, Int] = Process.emit(i + 1)

  def print: Sink[Task, Int] = scalaz.stream.sink.lift[Task, Int](i => Task.delay(println(s"Thread: ${Thread.currentThread()} $i")))

}

Blows up with a Stack Overflow:

...
...
...
Thread: Thread[run-main-0,5,run-main-group-0] 1470
Thread: Thread[run-main-0,5,run-main-group-0] 1471
Thread: Thread[run-main-0,5,run-main-group-0] 1472
Thread: Thread[run-main-0,5,run-main-group-0] 1473
Thread: Thread[run-main-0,5,run-main-group-0] 1474
Thread: Thread[run-main-0,5,run-main-group-0] 1475
Thread: Thread[run-main-0,5,run-main-group-0] 1476
Thread: Thread[run-main-0,5,run-main-group-0] 1477
[error] (run-main-0) java.lang.StackOverflowError
java.lang.StackOverflowError
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at scalaz.stream.Process$.fail(Process.scala:830)
	at scalaz.stream.Util$.Try(Util.scala:39)
	at scalaz.stream.Process$class.go$1(Process.scala:84)
	at scalaz.stream.Process$class.step(Process.scala:97)
	at scalaz.stream.Process$Append.step(Process.scala:641)
	at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:143)
	at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:139)
	at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
	at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
	at scalaz.stream.Util$.Try(Util.scala:38)
	at scalaz.stream.Process$class.flatMap(Process.scala:46)
	at scalaz.stream.Process$Emit.flatMap(Process.scala:592)
	at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
	at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
	at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
	at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
	at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
	at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
	at scalaz.Free.resume(Free.scala:72)
	at scalaz.Free.go2$1(Free.scala:118)
	at scalaz.Free.go(Free.scala:122)
	at scalaz.Free.run(Free.scala:172)
	at scalaz.stream.Process$$anonfun$go$1$1.apply(Process.scala:84)
	at scalaz.stream.Process$$anonfun$go$1$1.apply(Process.scala:84)
	at scalaz.stream.Util$.Try(Util.scala:38)
	at scalaz.stream.Process$class.go$1(Process.scala:84)
	at scalaz.stream.Process$class.step(Process.scala:97)
	at scalaz.stream.Process$Append.step(Process.scala:641)
	at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:143)
	at scalaz.stream.Process$$anonfun$pipe$1.apply(Process.scala:139)
	at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
	at scalaz.stream.Process$$anonfun$flatMap$1.apply(Process.scala:46)
	at scalaz.stream.Util$.Try(Util.scala:38)
	at scalaz.stream.Process$class.flatMap(Process.scala:46)
	at scalaz.stream.Process$Emit.flatMap(Process.scala:592)
	at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
	at scalaz.stream.Process$$anonfun$flatMap$4.apply(Process.scala:48)
	at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
	at scalaz.Free$$anonfun$map$1.apply(Free.scala:52)
	at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
	at scalaz.Free$$anonfun$flatMap$1$$anonfun$apply$1.apply(Free.scala:60)
	at scalaz.Free.resume(Free.scala:72)
	at scalaz.Free.go2$1(Free.scala:118)
	at scalaz.Free.go(Free.scala:122)
	at scalaz.Free.run(Free.scala:172)
	...
	...
	...

However when I replace the flatMap with a mergeN - it doesn't blow the stack but runs waaaaaay slower.

import scalaz.concurrent.Task
import scalaz.stream._


object Main extends App {

  def infiniteInts(streamOfInts: Process[Task, Int]): Process[Task, Int] = {
    val next: Process[Task, Int] = merge.mergeN(streamOfInts.last.map(nextInt))
    next ++ infiniteInts(next)
  }


  infiniteInts(Process.emit(0)).observe(print).run.run

  def nextInt(i: Int): Process[Task, Int] = Process.emit(i + 1)

  def print: Sink[Task, Int] = scalaz.stream.sink.lift[Task, Int](i => Task.delay(println(s"Thread: ${Thread.currentThread()} $i")))

}

So the question is, why does flatMap blow the stack (when it should be trampolined) and why doesn't map/mergeN blow the stack, especially since map is implemented in terms of flatMap?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment