Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active August 29, 2015 13:57
Show Gist options
  • Save atamborrino/9690225 to your computer and use it in GitHub Desktop.
Save atamborrino/9690225 to your computer and use it in GitHub Desktop.
import play.api.libs.iteratee._
import play.api.libs.iteratee.Enumerator._
import concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import scala.concurrent.duration._
implicit val t = 100 seconds
trait TreatCont2[E, S] {
def apply[A](loop: (Iteratee[E, A], S) => Future[Iteratee[E, A]], s: S, it: Iteratee[E, A]): Future[Iteratee[E, A]]
}
def checkContinue2[E, S](s: S)(inner: TreatCont2[E, S]) = new Enumerator[E] {
def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
def step(it: Iteratee[E, A], state: S): Future[Iteratee[E, A]] = it.fold {
case Step.Done(a, e) => Future.successful(Done(a, e))
case contStep @ Step.Cont(k) => inner[A](step, state, contStep.it)
case Step.Error(msg, e) => Future.successful(Error(msg, e))
}
step(it, s)
}
}
case class PathId(i: Int)
trait Evented[A] {
def getId(a: A): PathId
}
case class Event(id: PathId)
object Event {
implicit val evented = new Evented[Event] {
def getId(e: Event) = e.id
}
}
def unfoldPathId[E: Evented](id: PathId)(f: PathId => Future[Option[(PathId, Enumerator[E])]])
(implicit ec: ExecutionContext): Enumerator[E] = checkContinue2(id)(new TreatCont2[E, PathId] {
def apply[A](loop: (Iteratee[E, A], PathId) => Future[Iteratee[E, A]], id: PathId, it: Iteratee[E, A]): Future[Iteratee[E, A]] = {
f(id).flatMap {
case Some((newS, enum)) =>
runAndGetLastChunk(enum, it) flatMap {
case (Some(lastE), nextIt) =>
loop(nextIt, implicitly[Evented[E]].getId(lastE))
case (None, _) =>
loop(it, id)
}
case None =>
Future.successful(it)
}
}
})
def headOption[E](enum: Enumerator[E]): Future[(Option[E], Enumerator[E])] = {
Concurrent.runPartial(enum, Enumeratee.take(1) &> Iteratee.getChunks[E]) flatMap { case (itEs, queueEnum) =>
itEs.run map (es => (es.headOption, queueEnum))
}
}
def runAndGetLastChunk[E, A](enum: Enumerator[E], it: Iteratee[E, A]): Future[(Option[E], Iteratee[E, A])] = {
def loop[E, A](enum: Enumerator[E], it: Iteratee[E, A], lastE: E): Future[(Option[E], Iteratee[E, A])] = {
headOption(enum) flatMap {
case (Some(e), remainingEnum) =>
it.feed(Input.El(e)) flatMap { nextIt =>
loop(remainingEnum, nextIt, e)
}
case (None, _) => Future.successful((Some(lastE), it))
}
}
headOption(enum) flatMap {
case (Some(e), remainingEnum) =>
it.feed(Input.El(e)) flatMap { nextIt =>
loop(remainingEnum, nextIt, e)
}
case (None, _) => Future.successful((None, it))
}
}
val enum = unfoldPathId(PathId(0)) {
case PathId(i) if i == 0 =>
Future.successful(Some(PathId(0), Enumerator(Event(PathId(1)), Event(PathId(2)), Event(PathId(3)))))
case PathId(i) if i == 3 =>
Future.successful(Some(PathId(0), Enumerator(Event(PathId(4)), Event(PathId(5)), Event(PathId(6)))))
case PathId(i) if i == 6 =>
Future.successful(None)
}
println(Await.result(enum |>>> Iteratee.fold(Seq.empty[Event]) { (a, e) =>
Thread.sleep(1000)
println(a :+ e)
a :+ e
}, t))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment