Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
StackOverflow in scalaz-stream. My trampoline is not bouncy enough
import scalaz.stream._
import Process._
import scalaz.concurrent.Task
import scala.concurrent.duration._
// git bisect identifies the offending commit as
// https://github.com/scalaz/scalaz-stream/commit/721716ed7af0c126593e9ee227c0f36f21c5b7ed
object Test {
def main(args: Array[String]) {
// this could be a constant process, this doesn't appear to be required
val triggers: Process[Task, Duration] = Process.awakeEvery(1.second).take(10)
// this can also be a constant, delay not required
val randomStrings: Process[Task, String] = Process.eval{ Task.delay{ "boo!" }}.repeat.take(50)
// no SOE without the tee AND the every -- removing one fixes it
val slowStrings: Process[Task,String] = Process.every(400.millis).tee(randomStrings)(tee.when)
// the suspicious wye
def combined: Process[Task, Any] = slowStrings.wye(triggers)(wye.merge[Any])
// SOE. Free monads all over the stack, entering go() each time
combined.runLog.run
}
}
@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

Stack trace, for reference:

    at scalaz.Free.run(Free.scala:174)
    at scalaz.concurrent.Future$$anonfun$async$1$$anonfun$apply$12.apply(Future.scala:321)
    at scalaz.concurrent.Future$$anonfun$async$1$$anonfun$apply$12.apply(Future.scala:321)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(WyeActor.scala:91)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(WyeActor.scala:91)
    at scala.Option.foreach(Option.scala:236)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1.apply(WyeActor.scala:91)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1.apply(WyeActor.scala:91)
    at scalaz.concurrent.Future$$anonfun$runAsyncInterruptibly$1.apply(Future.scala:153)
    at scalaz.concurrent.Future$$anonfun$runAsyncInterruptibly$1.apply(Future.scala:153)
    at scalaz.concurrent.Future$class.listenInterruptibly(Future.scala:92)
    at scalaz.concurrent.Future$Now.listenInterruptibly(Future.scala:197)
    at scalaz.concurrent.Future$class.runAsyncInterruptibly(Future.scala:153)
    at scalaz.concurrent.Future$Now.runAsyncInterruptibly(Future.scala:197)
    at scalaz.concurrent.Task.runAsyncInterruptibly(Task.scala:103)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1.apply(WyeActor.scala:91)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1.apply(WyeActor.scala:87)
    at scalaz.concurrent.Future$$anonfun$async$1.apply(Future.scala:321)
    at scalaz.concurrent.Future$$anonfun$async$1.apply(Future.scala:321)
    at scalaz.concurrent.Future$class.listen(Future.scala:81)
    at scalaz.concurrent.Future$BindSuspend.listen(Future.scala:200)
    at scalaz.concurrent.Future$$anonfun$listen$1$$anonfun$apply$4.apply(Future.scala:81)
    at scalaz.concurrent.Future$$anonfun$listen$1$$anonfun$apply$4.apply(Future.scala:81)
    at scalaz.Free$$anonfun$map$1.apply(Free.scala:64)
    at scalaz.Free$$anonfun$map$1.apply(Free.scala:64)
    at scalaz.Free.resume(Free.scala:81)
    at scalaz.Free.go2$1(Free.scala:116)
    at scalaz.Free.go(Free.scala:120)
    at scalaz.Free.run(Free.scala:174)
@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

There's definitely asyncing going on here. I suggest adding an await in front of all your processes p:

await(Task(()), _ => p)

If that helps, remove them one by one to find the one that breaks. Something is too strict here.

@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

OK, that doesn't help. Honestly, I think the bug is in WyeActor, where Task.async is being called. Async will actually call Trampoline.run, which is not stack safe unless the asynchronous task runs in a separate thread.

@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

Yeah, that fixes it.

In WyeActor.scala:

Task.async[A] { (cb: Throwable \/ A => Unit) =>
          val running = RunningTaskImpl(cb)
          runningTask.set(Some(running))
          if (interrupted.get) interrupt()
          else req.runAsyncInterruptibly(r => runningTask.getAndSet(None).foreach(_ => running.complete(r)), interrupted)
        }

I changed that to:

Task.fork(Task.async {
...
})

And the SOE goes away.

@jessitron

This comment has been minimized.

Copy link
Owner Author

@jessitron jessitron commented Feb 24, 2014

Ah sweet!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment