Last active
April 23, 2019 07:40
-
-
Save sergey-scherbina/fb7d9768164428fe63ccf66694043dc4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Faster coroutine pipelines | |
// https://dl.acm.org/citation.cfm?doid=3136534.3110249 | |
// https://github.com/iokasimov/pipeline/blob/master/Control/Pipeline.hs | |
object Monads { | |
trait Monad[M[_]] { | |
def pure[A](a: A): M[A] | |
def >>=[A, B](m: M[A])(f: A => M[B]): M[B] | |
} | |
object Monad { | |
def apply[M[_] : Monad]: Monad[M] = implicitly | |
} | |
implicit class MonadPure[A](val a: A) extends AnyVal { | |
def pure[M[_] : Monad] = Monad[M].pure(a) | |
} | |
implicit class MonadBind[A, M[_]](val m: M[A]) extends AnyVal { | |
def >>=[B](f: A => M[B]) | |
(implicit M: Monad[M]): M[B] = | |
M.>>=(m)(f) | |
} | |
type Id[A] = A | |
implicit val idMonad: Monad[Id] = new Monad[Id] { | |
override def pure[A](a: A): Id[A] = a | |
override def >>=[A, B](m: Id[A])(f: A => Id[B]): Id[B] = f(m) | |
} | |
type ContT[R, M[_], A] = (A => M[R]) => M[R] | |
implicit def contTMonad[R, M[_] : Monad] = | |
new Monad[({type F[A] = ContT[R, M, A]})#F] { | |
override def pure[A](a: A): ContT[R, M, A] = k => k(a) | |
override def >>=[A, B](m: ContT[R, M, A])(f: A => ContT[R, M, B]): ContT[R, M, B] = | |
k => m(a => f(a)(k)) | |
} | |
} | |
object ContPipes { | |
import Monads._ | |
final case class Producer[I, T[_], R](produce: Consumer[I, T, R] => T[R]) //extends AnyVal | |
final case class Consumer[O, T[_], R](consume: O => Producer[O, T, R] => T[R]) //extends AnyVal | |
final case class Pipe[I, O, R, T[_], A](pipe: Producer[I, T, R] => Consumer[O, T, R] => T[R]) extends AnyVal | |
type PipeF[I, O, R, T[_]] = {type F[X] = Pipe[I, O, R, T, X]} | |
type Pipeline[I, O, T[_], A, R] = ContT[R, ({type F[X] = Pipe[I, O, R, T, X]})#F, A] | |
def Pipeline[I, O, T[_], A, R]: ((A => Pipe[I, O, R, T, R]) => Pipe[I, O, R, T, R]) => | |
ContT[R, ({type F[X] = Pipe[I, O, R, T, X]})#F, A] = identity | |
def pause[I, O, R, T[_], A](next: Unit => Pipe[I, O, R, T, A]): Producer[I, T, R] => Producer[O, T, R] = | |
ik => Producer(ok => next(()).pipe(ik)(ok)) | |
def suspend[I, O, R, T[_], A](next: I => Pipe[I, O, R, T, A]): Consumer[O, T, R] => Consumer[I, T, R] = | |
ok => Consumer(v => ik => next(v).pipe(ik)(ok)) | |
// -- | Take incoming value from pipeline | |
def await[I, O, R, T[_]]: Pipeline[I, O, T, I, R] = | |
Pipeline(k => Pipe(ik => ok => ik.produce(suspend(k)(ok)))) | |
// -- | Give a value to the future consuming | |
def `yield`[I, O, R, T[_]](v: O): Pipeline[I, O, T, Unit, R] = | |
Pipeline(k => Pipe(ik => ok => ok.consume(v)(pause(k)(ik)))) | |
// -- | Pipeline that does nothing | |
def finish[I, O, T[_] : Monad]: Pipeline[I, O, T, Unit, Unit] = | |
Pipeline(end(_)) | |
def end[B, C, D, T[_] : Monad](b: B): Pipe[C, D, Unit, T, Unit] = | |
Pipe(_ => _ => ().pure[T]) | |
// -- | Do some effectful computation within pipeline | |
def impact[I, O, T[_] : Monad, A](e: T[A]): Pipeline[I, O, T, A, Unit] = | |
Pipeline(next => Pipe(ik => ok => e >>= (x => next(x).pipe(ik)(ok)))) | |
// -- | Compose two pipelines into one | |
def compose[I, E, A, O, T[_] : Monad](p: Pipeline[I, E, T, Unit, Unit])( | |
q: Pipeline[E, O, T, Unit, Unit]): Pipeline[I, O, T, A, Unit] = | |
Pipeline(k => Pipe(ik => ok => q(end(_)).pipe(pause(_ => p(end(_)))(ik))(ok))) | |
implicit class PipelineCompose[I, E, T[_]](val p: Pipeline[I, E, T, Unit, Unit]) extends AnyVal { | |
def =>=[O, A](q: Pipeline[E, O, T, Unit, Unit])(implicit M: Monad[T]): Pipeline[I, O, T, A, Unit] = | |
compose(p)(q) | |
} | |
// -- | Run pipeline and get result | |
def runPipeline[I, O, T[_] : Monad, R](p: Pipeline[I, O, T, R, R]): T[R] = { | |
lazy val i: Producer[I, T, R] = Producer(o => i.produce(o)) | |
lazy val o: Consumer[O, T, R] = Consumer(v => i => o.consume(v)(i)) | |
p(r => Pipe(_ => _ => r.pure[T])).pipe(i)(o) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment