Skip to content

Instantly share code, notes, and snippets.

@sergey-scherbina
Last active April 23, 2019 07:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergey-scherbina/fb7d9768164428fe63ccf66694043dc4 to your computer and use it in GitHub Desktop.
Save sergey-scherbina/fb7d9768164428fe63ccf66694043dc4 to your computer and use it in GitHub Desktop.
// Faster coroutine pipelines
// https://dl.acm.org/citation.cfm?doid=3136534.3110249
// https://github.com/iokasimov/pipeline/blob/master/Control/Pipeline.hs
object Monads {
trait Monad[M[_]] {
def pure[A](a: A): M[A]
def >>=[A, B](m: M[A])(f: A => M[B]): M[B]
}
object Monad {
def apply[M[_] : Monad]: Monad[M] = implicitly
}
implicit class MonadPure[A](val a: A) extends AnyVal {
def pure[M[_] : Monad] = Monad[M].pure(a)
}
implicit class MonadBind[A, M[_]](val m: M[A]) extends AnyVal {
def >>=[B](f: A => M[B])
(implicit M: Monad[M]): M[B] =
M.>>=(m)(f)
}
type Id[A] = A
implicit val idMonad: Monad[Id] = new Monad[Id] {
override def pure[A](a: A): Id[A] = a
override def >>=[A, B](m: Id[A])(f: A => Id[B]): Id[B] = f(m)
}
type ContT[R, M[_], A] = (A => M[R]) => M[R]
implicit def contTMonad[R, M[_] : Monad] =
new Monad[({type F[A] = ContT[R, M, A]})#F] {
override def pure[A](a: A): ContT[R, M, A] = k => k(a)
override def >>=[A, B](m: ContT[R, M, A])(f: A => ContT[R, M, B]): ContT[R, M, B] =
k => m(a => f(a)(k))
}
}
object ContPipes {
import Monads._
final case class Producer[I, T[_], R](produce: Consumer[I, T, R] => T[R]) //extends AnyVal
final case class Consumer[O, T[_], R](consume: O => Producer[O, T, R] => T[R]) //extends AnyVal
final case class Pipe[I, O, R, T[_], A](pipe: Producer[I, T, R] => Consumer[O, T, R] => T[R]) extends AnyVal
type PipeF[I, O, R, T[_]] = {type F[X] = Pipe[I, O, R, T, X]}
type Pipeline[I, O, T[_], A, R] = ContT[R, ({type F[X] = Pipe[I, O, R, T, X]})#F, A]
def Pipeline[I, O, T[_], A, R]: ((A => Pipe[I, O, R, T, R]) => Pipe[I, O, R, T, R]) =>
ContT[R, ({type F[X] = Pipe[I, O, R, T, X]})#F, A] = identity
def pause[I, O, R, T[_], A](next: Unit => Pipe[I, O, R, T, A]): Producer[I, T, R] => Producer[O, T, R] =
ik => Producer(ok => next(()).pipe(ik)(ok))
def suspend[I, O, R, T[_], A](next: I => Pipe[I, O, R, T, A]): Consumer[O, T, R] => Consumer[I, T, R] =
ok => Consumer(v => ik => next(v).pipe(ik)(ok))
// -- | Take incoming value from pipeline
def await[I, O, R, T[_]]: Pipeline[I, O, T, I, R] =
Pipeline(k => Pipe(ik => ok => ik.produce(suspend(k)(ok))))
// -- | Give a value to the future consuming
def `yield`[I, O, R, T[_]](v: O): Pipeline[I, O, T, Unit, R] =
Pipeline(k => Pipe(ik => ok => ok.consume(v)(pause(k)(ik))))
// -- | Pipeline that does nothing
def finish[I, O, T[_] : Monad]: Pipeline[I, O, T, Unit, Unit] =
Pipeline(end(_))
def end[B, C, D, T[_] : Monad](b: B): Pipe[C, D, Unit, T, Unit] =
Pipe(_ => _ => ().pure[T])
// -- | Do some effectful computation within pipeline
def impact[I, O, T[_] : Monad, A](e: T[A]): Pipeline[I, O, T, A, Unit] =
Pipeline(next => Pipe(ik => ok => e >>= (x => next(x).pipe(ik)(ok))))
// -- | Compose two pipelines into one
def compose[I, E, A, O, T[_] : Monad](p: Pipeline[I, E, T, Unit, Unit])(
q: Pipeline[E, O, T, Unit, Unit]): Pipeline[I, O, T, A, Unit] =
Pipeline(k => Pipe(ik => ok => q(end(_)).pipe(pause(_ => p(end(_)))(ik))(ok)))
implicit class PipelineCompose[I, E, T[_]](val p: Pipeline[I, E, T, Unit, Unit]) extends AnyVal {
def =>=[O, A](q: Pipeline[E, O, T, Unit, Unit])(implicit M: Monad[T]): Pipeline[I, O, T, A, Unit] =
compose(p)(q)
}
// -- | Run pipeline and get result
def runPipeline[I, O, T[_] : Monad, R](p: Pipeline[I, O, T, R, R]): T[R] = {
lazy val i: Producer[I, T, R] = Producer(o => i.produce(o))
lazy val o: Consumer[O, T, R] = Consumer(v => i => o.consume(v)(i))
p(r => Pipe(_ => _ => r.pure[T])).pipe(i)(o)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment