-
-
Save xuwei-k/9422569 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scalaz._ | |
import \/._ | |
import Free._ | |
import scalaz.syntax.monad._ | |
object Experiment { | |
sealed trait OI[A] { | |
def map[B](f: A => B): OI[B] | |
} | |
case class Async[A](k: (A => Trampoline[Unit]) => Unit) extends OI[A] { | |
def map[B](f: A => B): OI[B] = | |
Async(cb => k(a => cb(f(a)))) | |
} | |
case class Step[A](a: () => A) extends OI[A] { | |
def map[B](f: A => B): OI[B] = Step(() => f(a())) | |
} | |
case class Error[A](e: Throwable) extends OI[A] { | |
def map[B](f: A => B): OI[B] = Error(e) | |
} | |
case class Strat[A](k: Strategy => A) extends OI[A] { | |
def map[B](f: A => B): OI[B] = Strat(s => f(k(s))) | |
} | |
object OI { | |
implicit val oiFunctor: Functor[OI] = new Functor[OI] { | |
def map[A,B](oi: OI[A])(f: A => B): OI[B] = oi map f | |
} | |
} | |
// The representation of IO | |
private type Rep[A] = Free[OI, A] | |
class IO[A](private val rep: Rep[A]) { | |
def flatMap[B](f: A => IO[B]): IO[B] = | |
new IO(rep flatMap (a => f(a).rep)) | |
def map[B](f: A => B): IO[B] = | |
flatMap(a => new IO(Return(f(a)))) | |
def attempt: IO[Throwable \/ A] = { | |
def go(s: Free[OI, A]): Free[OI, Throwable \/ A] = | |
s.resume match { | |
case -\/(Error(e)) => Return(left(e)) | |
case -\/(s) => Suspend(s.map(go)) | |
case \/-(a) => Return(right(a)) | |
} | |
new IO(go(rep)) | |
} | |
/** UNSAFE! */ | |
def step: IO[A] = { | |
def go(r: Rep[A]): Rep[A] = r.resume match { | |
case -\/(Step(thunk)) => go(thunk()) | |
case _ => r | |
} | |
new IO(go(rep)) | |
} | |
} | |
object IO { | |
/** Utility function - evaluate `a` and catch and return any exceptions. */ | |
def Try[A](a: => A): IO[A] = | |
try now(a) catch { case e: Throwable => fail(e) } | |
/** An `IO` that always fails with the given error. */ | |
def fail[A](e: Throwable): IO[A] = | |
new IO(Suspend[OI, A](Error(e))) | |
/** An `IO` that always succeeds with the given value */ | |
def now[A](a: A): IO[A] = | |
new IO(Return(a)) | |
/** An `IO` that evaluates the given expression. */ | |
def delay[A](a: => A): IO[A] = | |
new IO(Suspend(Step(() => Return(a)))) | |
/** | |
* An `IO` that suspends evaluation of the given `IO`. | |
* A trampolining primitive, helpful for recursive definitions. | |
*/ | |
def suspend[A](a: => IO[A]): IO[A] = | |
new IO(Suspend(Step(() => Try(a).join.rep))) | |
/** | |
* Evaluate the given expression asynchronously in a new logical thread. | |
*/ | |
def apply[A](a: => A): IO[A] = | |
new IO(Suspend(Strat(s => Suspend(Async(cb => cb(Return(a)).run))))) | |
/** | |
* Explicitly fork the given `IO` as a new logical thread. | |
*/ | |
def fork[A](a: => IO[A]): IO[A] = | |
apply(a).join | |
/** | |
* Turn a callback-accepting function into an `IO`. Helpful when working with | |
* APIs that expect explicit registering of callbacks. | |
*/ | |
def async[A](register: ((Throwable \/ A) => Unit) => Unit): IO[A] = | |
new IO(Suspend(Async(cb => register { | |
case -\/(e) => cb(Suspend(Error(e))) | |
case \/-(a) => cb(Return(a)) | |
}))) | |
/** | |
* Locally change the parallelization strategy for the given `IO`. | |
*/ | |
def withStrategy[A](s: Strategy)(io: IO[A]): IO[A] = io.mapSuspension(new (OI ~> OI) { | |
def apply[T](oi: OI[T]) = oi match { | |
case Strat(k) => Strat(_ => k(s)) | |
} | |
}) | |
implicit val ioMonad: Monad[IO] = new Monad[IO] { | |
def bind[A,B](io: IO[A])(f: A => IO[B]) = io flatMap f | |
def point[A](a: => A) = delay(a) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment