Skip to content

Instantly share code, notes, and snippets.

@sgodbillon
Created December 10, 2012 13:52
Show Gist options
  • Save sgodbillon/4250672 to your computer and use it in GitHub Desktop.
Save sgodbillon/4250672 to your computer and use it in GitHub Desktop.
Bug Enumerator (StackOverflowError in a Promise)
package bugs
import play.api.libs.iteratee._
import scala.util.Failure
import scala.util.Success
import scala.concurrent.Future
import scala.concurrent.Promise
object StackOverflowErrorBug {
import scala.concurrent.ExecutionContext.Implicits.global
trait Cursor {
def iterator :Iterator[String]
def hasNext :Boolean
def next :Future[Cursor]
def i: Int = 0
def n(cursor: Cursor) =
if(cursor.iterator.hasNext) {
Future(Some((cursor,Some(cursor.iterator.next))))
} else if (cursor.hasNext) {
val fut = //cursor.next.map(c => Some((c,None)))
Future(Some(DefaultCursor(cursor.i + 1) -> None))
print(fut + ",")
fut
} else {
Future(None)
}
def enumerate = {
CustomEnumerator.unfoldM(this) { cursor =>
n(cursor)
}.andThen(Enumerator.eof).onDoneEnumerating{
println("done")
} &> Enumeratee.collect {
case Some(e) => e
}
}
object CustomEnumerator {
def unfoldM[S,E](s:S)(f: S => Future[Option[(S,E)]] ): Enumerator[E] = checkContinue1(s)(new TreatCont1[E,S]{
def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] = f(s).flatMap {
case Some((newS,e)) => {
// if we don't create this intermediate promise, then a stackoverflowerror is eventually thrown
// original code ->
// loop(k(Input.El(e)),newS)
// <- original code
val promise = Promise[play.api.libs.iteratee.Iteratee[E,A]]()
loop(k(Input.El(e)),newS).onComplete {
case Success(s) =>
promise.success(s)
case Failure(f) =>
promise.failure(f)
}
promise.future
}
case None => Future(Cont(k))
}
})
trait TreatCont1[E,S]{
def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]]
}
def checkContinue1[E,S](s:S)(inner:TreatCont1[E,S]) = new Enumerator[E] {
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
def step(it: Iteratee[E, A], state:S): Future[Iteratee[E,A]] = it.fold{
case Step.Done(a, e) => Future(Done(a,e))
case Step.Cont(k) => inner[A](step,state,k)
case Step.Error(msg, e) => Future(Error(msg,e))
}
step(it,s)
}
}
def loop(cursor: Cursor) :Future[Option[(Option[String], Cursor)]] = {
if(cursor.iterator.hasNext)
Future(Some(Some(cursor.iterator.next) -> cursor))
else if(cursor.hasNext)
cursor.next.map(c => Some(None -> c))
else Future(None)
}
}
}
case class DefaultCursor(override val i: Int) extends Cursor {
val iterator = {
val r = (for(j <- 0 to 1) yield i + "" + j)
r.toIterator
}
def hasNext = i < 5000
def next = {
Future(DefaultCursor(i + 1))
}
}
case class FlattenedCursor(cursor: Future[Cursor]) extends Cursor {
val iterator = Iterator.empty
def hasNext = true
def next = cursor
}
// should print "done: <some result>" at the end
def test = {
val enumerator = FlattenedCursor(Future(DefaultCursor(0))).enumerate
val fut = enumerator.apply(Iteratee.foreach({ e =>
//println(e)
}))
val ff = Iteratee.flatten(fut).run
ff.onComplete {
case e =>
println("done: " + e)
}
}
}
name := "TestIteratees"
version := "1.0"
scalaVersion := "2.10.0-RC1"
resolvers += "Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/"
resolvers += "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies ++= Seq(
"play" % "play-iteratees_2.10" % "2.1-RC1"
)
@viktorklang
Copy link

You can most likely replace the following:

val promise = Promiseplay.api.libs.iteratee.Iteratee[E,A]
loop(k(Input.El(e)),newS).onComplete {
case Success(s) =>
promise.success(s)
case Failure(f) =>
promise.failure(f)
}
promise.future

with

Promise[play.api.libs.iteratee.Iteratee[E,A]]().completeWith(loop(k(Input.El(e)),newS)).future

@sgodbillon
Copy link
Author

Tried it, sadly, completeWith also fails :/

@sadache
Copy link

sadache commented Jan 6, 2013

does this count as a valid (but minimal) test?

Enumerator.unfoldM(0){ (i => Future(Option((i+1,i)).filterNot(_ => i > 5000 )))} |>>> Iteratee.getChunks

@sadache
Copy link

sadache commented Jan 6, 2013

not minimal enough? here might be a smaller one:

def tata(f:Future[Int]):Future[Int] = f.flatMap(i => if (i < 0)  tata(Future(i+1)) else Future(i))
// small enough number, -750 is enough in the console
tata(-1000)

java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:58)
at scala.concurrent.Promise$class.failure(Promise.scala:107)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:58)
at scala.concurrent.Future$$anonfun$flatMap$1.liftedTree3$1(Future.scala:283)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:277)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:274)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:29)
at scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1417)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:915)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:980)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

@sgodbillon
Copy link
Author

You're right :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment