Skip to content

Instantly share code, notes, and snippets.

@jessitron
Created February 24, 2014 16:24
Show Gist options
  • Save jessitron/9191499 to your computer and use it in GitHub Desktop.
Save jessitron/9191499 to your computer and use it in GitHub Desktop.
StackOverflow in scalaz-stream. My trampoline is not bouncy enough
import scalaz.stream._
import Process._
import scalaz.concurrent.Task
import scala.concurrent.duration._
// git bisect identifies the offending commit as
// https://github.com/scalaz/scalaz-stream/commit/721716ed7af0c126593e9ee227c0f36f21c5b7ed
object Test {
def main(args: Array[String]) {
// this could be a constant process, this doesn't appear to be required
val triggers: Process[Task, Duration] = Process.awakeEvery(1.second).take(10)
// this can also be a constant, delay not required
val randomStrings: Process[Task, String] = Process.eval{ Task.delay{ "boo!" }}.repeat.take(50)
// no SOE without the tee AND the every -- removing one fixes it
val slowStrings: Process[Task,String] = Process.every(400.millis).tee(randomStrings)(tee.when)
// the suspicious wye
def combined: Process[Task, Any] = slowStrings.wye(triggers)(wye.merge[Any])
// SOE. Free monads all over the stack, entering go() each time
combined.runLog.run
}
}
@runarorama
Copy link

Yeah, that fixes it.

In WyeActor.scala:

Task.async[A] { (cb: Throwable \/ A => Unit) =>
          val running = RunningTaskImpl(cb)
          runningTask.set(Some(running))
          if (interrupted.get) interrupt()
          else req.runAsyncInterruptibly(r => runningTask.getAndSet(None).foreach(_ => running.complete(r)), interrupted)
        }

I changed that to:

Task.fork(Task.async {
...
})

And the SOE goes away.

@jessitron
Copy link
Author

Ah sweet!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment