Created
February 24, 2014 16:24
-
-
Save jessitron/9191499 to your computer and use it in GitHub Desktop.
StackOverflow in scalaz-stream. My trampoline is not bouncy enough
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz.stream._ | |
import Process._ | |
import scalaz.concurrent.Task | |
import scala.concurrent.duration._ | |
// git bisect identifies the offending commit as | |
// https://github.com/scalaz/scalaz-stream/commit/721716ed7af0c126593e9ee227c0f36f21c5b7ed | |
object Test { | |
def main(args: Array[String]) { | |
// this could be a constant process, this doesn't appear to be required | |
val triggers: Process[Task, Duration] = Process.awakeEvery(1.second).take(10) | |
// this can also be a constant, delay not required | |
val randomStrings: Process[Task, String] = Process.eval{ Task.delay{ "boo!" }}.repeat.take(50) | |
// no SOE without the tee AND the every -- removing one fixes it | |
val slowStrings: Process[Task,String] = Process.every(400.millis).tee(randomStrings)(tee.when) | |
// the suspicious wye | |
def combined: Process[Task, Any] = slowStrings.wye(triggers)(wye.merge[Any]) | |
// SOE. Free monads all over the stack, entering go() each time | |
combined.runLog.run | |
} | |
} |
There's definitely asyncing going on here. I suggest adding an await in front of all your processes p
:
await(Task(()), _ => p)
If that helps, remove them one by one to find the one that breaks. Something is too strict here.
OK, that doesn't help. Honestly, I think the bug is in WyeActor, where Task.async is being called. Async will actually call Trampoline.run, which is not stack safe unless the asynchronous task runs in a separate thread.
Yeah, that fixes it.
In WyeActor.scala:
Task.async[A] { (cb: Throwable \/ A => Unit) =>
val running = RunningTaskImpl(cb)
runningTask.set(Some(running))
if (interrupted.get) interrupt()
else req.runAsyncInterruptibly(r => runningTask.getAndSet(None).foreach(_ => running.complete(r)), interrupted)
}
I changed that to:
Task.fork(Task.async {
...
})
And the SOE goes away.
Ah sweet!!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Stack trace, for reference: