Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
StackOverflow in scalaz-stream. My trampoline is not bouncy enough
import scalaz.stream._
import Process._
import scalaz.concurrent.Task
import scala.concurrent.duration._
// git bisect identifies the offending commit as
// https://github.com/scalaz/scalaz-stream/commit/721716ed7af0c126593e9ee227c0f36f21c5b7ed
object Test {
def main(args: Array[String]) {
// this could be a constant process, this doesn't appear to be required
val triggers: Process[Task, Duration] = Process.awakeEvery(1.second).take(10)
// this can also be a constant, delay not required
val randomStrings: Process[Task, String] = Process.eval{ Task.delay{ "boo!" }}.repeat.take(50)
// no SOE without the tee AND the every -- removing one fixes it
val slowStrings: Process[Task,String] = Process.every(400.millis).tee(randomStrings)(tee.when)
// the suspicious wye
def combined: Process[Task, Any] = slowStrings.wye(triggers)(wye.merge[Any])
// SOE. Free monads all over the stack, entering go() each time
combined.runLog.run
}
}
@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

Stack trace, for reference:

    at scalaz.Free.run(Free.scala:174)
    at scalaz.concurrent.Future$$anonfun$async$1$$anonfun$apply$12.apply(Future.scala:321)
    at scalaz.concurrent.Future$$anonfun$async$1$$anonfun$apply$12.apply(Future.scala:321)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(WyeActor.scala:91)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1$$anonfun$apply$2.apply(WyeActor.scala:91)
    at scala.Option.foreach(Option.scala:236)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1.apply(WyeActor.scala:91)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1$$anonfun$apply$1.apply(WyeActor.scala:91)
    at scalaz.concurrent.Future$$anonfun$runAsyncInterruptibly$1.apply(Future.scala:153)
    at scalaz.concurrent.Future$$anonfun$runAsyncInterruptibly$1.apply(Future.scala:153)
    at scalaz.concurrent.Future$class.listenInterruptibly(Future.scala:92)
    at scalaz.concurrent.Future$Now.listenInterruptibly(Future.scala:197)
    at scalaz.concurrent.Future$class.runAsyncInterruptibly(Future.scala:153)
    at scalaz.concurrent.Future$Now.runAsyncInterruptibly(Future.scala:197)
    at scalaz.concurrent.Task.runAsyncInterruptibly(Task.scala:103)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1.apply(WyeActor.scala:91)
    at scalaz.stream.actor.WyeActor$$anonfun$onAwait$1$1.apply(WyeActor.scala:87)
    at scalaz.concurrent.Future$$anonfun$async$1.apply(Future.scala:321)
    at scalaz.concurrent.Future$$anonfun$async$1.apply(Future.scala:321)
    at scalaz.concurrent.Future$class.listen(Future.scala:81)
    at scalaz.concurrent.Future$BindSuspend.listen(Future.scala:200)
    at scalaz.concurrent.Future$$anonfun$listen$1$$anonfun$apply$4.apply(Future.scala:81)
    at scalaz.concurrent.Future$$anonfun$listen$1$$anonfun$apply$4.apply(Future.scala:81)
    at scalaz.Free$$anonfun$map$1.apply(Free.scala:64)
    at scalaz.Free$$anonfun$map$1.apply(Free.scala:64)
    at scalaz.Free.resume(Free.scala:81)
    at scalaz.Free.go2$1(Free.scala:116)
    at scalaz.Free.go(Free.scala:120)
    at scalaz.Free.run(Free.scala:174)
@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

There's definitely asyncing going on here. I suggest adding an await in front of all your processes p:

await(Task(()), _ => p)

If that helps, remove them one by one to find the one that breaks. Something is too strict here.

@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

OK, that doesn't help. Honestly, I think the bug is in WyeActor, where Task.async is being called. Async will actually call Trampoline.run, which is not stack safe unless the asynchronous task runs in a separate thread.

@runarorama

This comment has been minimized.

Copy link

@runarorama runarorama commented Feb 24, 2014

Yeah, that fixes it.

In WyeActor.scala:

Task.async[A] { (cb: Throwable \/ A => Unit) =>
          val running = RunningTaskImpl(cb)
          runningTask.set(Some(running))
          if (interrupted.get) interrupt()
          else req.runAsyncInterruptibly(r => runningTask.getAndSet(None).foreach(_ => running.complete(r)), interrupted)
        }

I changed that to:

Task.fork(Task.async {
...
})

And the SOE goes away.

@jessitron

This comment has been minimized.

Copy link
Owner Author

@jessitron jessitron commented Feb 24, 2014

Ah sweet!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.