Skip to content

Instantly share code, notes, and snippets.

@gigiigig
Last active October 5, 2015 06:52
Show Gist options
  • Save gigiigig/f027c76f9ff758ba2450 to your computer and use it in GitHub Desktop.
Save gigiigig/f027c76f9ff758ba2450 to your computer and use it in GitHub Desktop.
Processes
import scalaz.concurrent.Task
import scalaz.concurrent.Strategy._
object console extends App {
object Process {
def liftOne[I,O](f: I => O): Process[I,O] = Await {
case Some(i) => Emit(f(i))
case None => Halt()
}
}
sealed trait Process[I,O] {
def apply(s: Stream[I]): Stream[O] = this match { case Halt() => Stream()
case Await(recv) => s match {
case h #:: t => recv(Some(h))(t)
case xs => recv(None)(xs)
}
case Emit(h,t) => h #:: t(s)
}
def repeat: Process[I,O] = {
def go(p: Process[I,O]): Process[I,O] = p match {
case Halt() => go(this)
case Await(recv) => Await {
case None => recv(None)
case i => go(recv(i))
}
case Emit(h, t) => Emit(h, go(t))
}
go(this)
}
}
case class Await[I,O](recv: Option[I] => Process[I,O]) extends Process[I,O]
case class Halt[I,O]() extends Process[I,O]
case class Emit[I,O](head: O, tail: Process[I,O] = Halt[I,O]())
extends Process[I,O]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment