Skip to content

Instantly share code, notes, and snippets.

@YoEight
Last active December 19, 2015 18:48
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save YoEight/6001197 to your computer and use it in GitHub Desktop.
Save YoEight/6001197 to your computer and use it in GitHub Desktop.
Scalaz-stream Process to Play Enumerator
package deiko
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scalaz.concurrent.Task
import scalaz.stream._
import play.api.libs.iteratee._
import Process._
object Conversion {
def sourceToEnumerator[E](proc: Process[Task, E]): Enumerator[E] = new Enumerator[E] {
def apply[A](start: Iteratee[E, A]): Future[Iteratee[E, A]] = {
def feed(xs: Seq[E], next: Process[Task, E], i: Iteratee[E, A]): Future[Iteratee[E, A]] = {
def step(i: Iteratee[E, A]) =
i.fold {
case Step.Cont(_) => feed(xs.tail, next, i)
case _ => kill(next, i)
}
if (xs.isEmpty) go(next, i)
else step(Iteratee.flatten(i.feed(Input.El(xs.head))))
}
@annotation.tailrec
def kill(proc: Process[Task, E], i: Iteratee[E, A]): Future[Iteratee[E, A]] = proc match {
case Halt => Future.successful(i)
case Emit(_, next) => kill(next, i)
case Await(req, recv, fallback, error) =>
val next = try recv(req.run) catch {
case Process.End => fallback
case e: Exception => error match {
case Halt => throw e
case _ => error ++ Process.wrap(Task.delay(throw e))
}
}
kill(next, i)
}
def go(proc: Process[Task, E], i: Iteratee[E, A]): Future[Iteratee[E, A]] = {
i.fold {
case Step.Cont(k) => proc match {
case Emit(values, next) => feed(values, next, i)
case Halt => go(proc, k(Input.EOF))
case Await(req, recv, fallback, error) =>
val next = try recv(req.run) catch {
case Process.End => fallback
case e: Exception => error match {
case Halt => throw e
case _ => error ++ Process.wrap(Task.delay(throw e))
}
}
go(next, i)
}
case _ => kill(proc.kill, i)
}
}
go(proc, start)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment