Skip to content

Instantly share code, notes, and snippets.

@sergey-scherbina
Created April 8, 2019 22:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sergey-scherbina/f27de17dffec64f7a1d2ce5d39be374f to your computer and use it in GitHub Desktop.
Save sergey-scherbina/f27de17dffec64f7a1d2ce5d39be374f to your computer and use it in GitHub Desktop.
/*
https://github.com/rizo/streams/blob/master/src/coroutine.ml
https://pusher.com/sessions/meetup/the-realtime-guild/realtime-stream-processing-with-coroutines
*/
object Pipes {
sealed trait Pipe[I, O, R] {
final def flatMap[T](f: R => Pipe[I, O, T]): Pipe[I, O, T] = this match {
case Yield(b, p) => Yield(b, () => p() flatMap f)
case Await(k) => Await(a => k(a) flatMap f)
case Ready(r) => f(r)
}
final def map[T](f: R => T): Pipe[I, O, T] = flatMap(r => Ready(f(r)))
final def compose[J](p: Pipe[J, I, R]): Pipe[J, O, R] = (this, p) match {
case (Ready(r), _) => Ready(r)
case (Yield(b, d), _) => Yield(b, () => d() compose p)
case (Await(k), Yield(b, u)) => k(b) compose u()
case (Await(_), Await(k)) => Await(a => compose(k(a)))
case (Await(_), Ready(r)) => Ready(r)
}
final def >>=[T](f: R => Pipe[I, O, T]): Pipe[I, O, T] = flatMap(f)
final def >>[T](p: Pipe[I, O, T]): Pipe[I, O, T] = flatMap(_ => p)
final def =<=[J](p: Pipe[J, I, R]): Pipe[J, O, R] = compose(p)
final def =>=[J](p: Pipe[O, J, R]): Pipe[I, J, R] = p compose this
}
case class Yield[I, O, R](outp: O, next: () => Pipe[I, O, R]) extends Pipe[I, O, R]
case class Await[I, O, R](next: I => Pipe[I, O, R]) extends Pipe[I, O, R]
case class Ready[I, O, R](result: R) extends Pipe[I, O, R]
type Producer[O, R] = Pipe[Unit, O, R]
type Consumer[I, R] = Pipe[I, Unit, R]
type Pipeline[R] = Pipe[Unit, Unit, R]
def `return`[A, B, R](r: R) = Ready(r)
def empty[A, B](): Pipe[A, B, Unit] = Ready()
def `yield`[A, B](b: B): Pipe[A, B, Unit] = Yield(b, empty)
def await[A, B]: Pipe[A, B, A] = Await(Ready(_))
def next[A, B, R](p: Pipe[A, B, R]): Option[(B, () => Pipe[A, B, R])] = p match {
case Ready(_) => Option.empty
case Yield(a, k) => Some((a, k))
case Await(_) => Option.empty
}
def lift[A, B, R](f: A => B): Pipe[A, B, R] =
await >>= (a => `yield`(f(a)) >> lift(f))
def filter[A, R](pred: A => Boolean): Pipe[A, A, R] = await >>= { a =>
if (pred(a)) `yield`(a) >> filter(pred) else filter(pred)
}
def take[A](n: Int): Pipe[A, A, Unit] = if (n <= 0) empty()
else Await(a => Yield(a, () => take(n - 1)))
def fold[A, B, C, D](flow: Pipe[C, B, D])(state: A)(f: A => B => A): A = {
def loop(flow: => Pipe[C, B, D])(state: A): A = next(flow) match {
case Some((a, rest)) => loop(rest())(f(state)(a))
case None => state
}
loop(flow)(state)
}
def run[A, B, C](flow: Pipe[A, B, C]): Unit = {
def loop(flow: => Pipe[A, B, C]): Unit = next(flow) match {
case Some((_, rest)) => loop(rest())
case None => ()
}
loop(flow)
}
def list[A](input: List[A]): Producer[A, Unit] = input match {
case x :: xs => `yield`(x) >> list(xs)
case Nil => empty()
}
//def cat[A, B, R](): Pipe[A, B, R] = Await(a => Yield(a, cat))
def init[A](n: Int)(f: Int => A): Pipe[Unit, A, Unit] = {
def loop(i: Int): Pipe[Unit, A, Unit] = if (i == n) empty()
else Yield(f(i), () => loop(i + 1))
loop(0)
}
def incr: Pipe[Int, Int, Unit] = await >>= ((n: Int) => `yield`(n + 1))
def count(): Pipe[Unit, Int, Unit] = {
def loop(n: Int): Pipe[Unit, Int, Unit] = Yield(n, () => loop(n + 1))
loop(0)
}
def fib(): Pipe[Unit, BigInt, Unit] = {
def loop(a: BigInt, b: BigInt): Pipe[Unit, BigInt, Unit] =
Yield(a, () => loop(b, a + b))
loop(0, 1)
}
}
object PipesTest extends App {
import Pipes._
run(fib() =>= take(10000) =>= lift(println))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment