Created
February 19, 2014 04:16
-
-
Save pchiusano/9086001 to your computer and use it in GitHub Desktop.
Trampolined process representation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package scalaz.stream | |
| import scalaz.concurrent.Task | |
| import scalaz.\/ | |
| trait Proc2[+F[_],+O] { | |
| import Proc2._ | |
| def flatMap[F2[x]>:F[x],O2](f: O => Proc2[F2,O2]): Proc2[F2,O2] = | |
| this match { | |
| case h@Halt(_) => h | |
| case a@Await(_,_) => a.extend(_.flatMap(f)) | |
| case Emit(ht) => Emit(ht.map { case (h,t) => | |
| (h map f).foldr(t.flatMap(f))(_ append _) | |
| }) | |
| } | |
| def append[F2[x]>:F[x],O2>:O](p2: => Proc2[F2,O2]): Proc2[F2,O2] = | |
| this match { | |
| case h@Halt(End) => Try(p2) | |
| case h@Halt(e) => h | |
| case Emit(ht) => Emit(ht.map { case (h,t) => (h, t.append(p2)) }) | |
| case a@Await(_,_) => a.extend(_.append(p2)) | |
| } | |
| } | |
| object Proc2 { | |
| case class Halt(e: Throwable) extends Proc2[Nothing,Nothing] | |
| case class Await[+F[_],A,+O]( | |
| req: F[A], | |
| recv: (Throwable \/ A) => Task[Proc2[F,O]]) | |
| extends Proc2[F,O] { | |
| def extend[F2[x]>:F[x],O2](f: Proc2[F,O] => Proc2[F2,O2]): Proc2[F2,O2] = | |
| Await[F2,A,O2](req, e => Task.suspend(recv(e)).map(f)) | |
| } | |
| case class Emit[F[_],O]( | |
| uncons: Task[(Seq[O], Proc2[F,O])]) | |
| extends Proc2[F,O] { | |
| } | |
| private[stream] def Try[F[_],A](p: => Proc2[F,A]): Proc2[F,A] = | |
| try p | |
| catch { case e: Throwable => Halt(e) } | |
| val halt = Halt(End) | |
| def fail(err: Throwable): Proc2[Nothing,Nothing] = | |
| Halt(err) | |
| def emit[O](o: O): Proc2[Nothing,O] = | |
| Emit[Nothing,O](Task.now(Vector(o) -> halt)) | |
| def await[F[_],A,O](req: F[A])(recv: Throwable \/ A => Task[Proc2[F,O]]): | |
| Proc2[F,O] = | |
| Await[F,A,O](req, recv) | |
| def eval[F[_],O](req: F[O]): Proc2[F,O] = | |
| Await[F,O,O](req, _.fold( | |
| e => Task.now(fail(e)), | |
| a => Task.now(emit(a)) | |
| )) | |
| /** | |
| * Special exception indicating normal termination due to | |
| * input ('upstream') termination. An `Await` may respond to an `End` | |
| * by switching to reads from a secondary source. | |
| */ | |
| case object End extends Exception { | |
| override def fillInStackTrace = this | |
| } | |
| /** | |
| * Special exception indicating downstream termination. | |
| * An `Await` should respond to a `Kill` by performing | |
| * necessary cleanup actions, then halting. | |
| */ | |
| case object Kill extends Exception { | |
| override def fillInStackTrace = this | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@pchiusano seems ok. I am just curious on cleanup in case of FlatMap and append. You think the cleanup doesn`t need special handling? I think here https://gist.github.com/pchiusano/9086001#file-proc2-scala-L20 we will need to assure that in case try ends up in Halt(e) we have to run cleanup?