Last active
August 21, 2018 05:16
-
-
Save mpilquist/1920e7221f5e507eb39e7e6372ce3c2e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package fs3 | |
import cats.Monad | |
sealed abstract class Free[F[_], R] { | |
def flatMap[R2](f: R => Free[F, R2]): Free[F, R2] = Free.Bind(this, f) | |
} | |
object Free { | |
case class Pure[F[_], R](r: R) extends Free[F, R] | |
case class Eval[F[_], R](fr: F[R]) extends Free[F, R] | |
case class Bind[F[_], X, R](fx: Free[F, X], f: X => Free[F, R]) extends Free[F, R] | |
} | |
sealed abstract class ViewL[F[_], R] | |
object ViewL { | |
case class Bound[F[_], X, R](fx: F[X], f: X => Free[F, R]) extends ViewL[F, R] | |
case class Done[F[_], R](r: R) extends ViewL[F, R] | |
private sealed abstract class K[F[_], A, B] { | |
def fold[X](ifId: (A => B) => X, ifFunction: (A => F[B]) => X): X | |
} | |
private object K { | |
case class Id[F[_], A]() extends K[F, A, A] { | |
def fold[X](ifId: (A => A) => X, ifFunction: (A => F[A]) => X): X = ifId(identity) | |
} | |
case class Wrapped[F[_], A, B](f: A => F[B]) extends K[F, A, B] { | |
def fold[X](ifId: (A => B) => X, ifFunction: (A => F[B]) => X): X = ifFunction(f) | |
} | |
} | |
def apply[F[_], R](free: Free[F, R]): ViewL[F, R] = { | |
def go[X](free: Free[F, X], k: K[Free[F, ?], X, R]): ViewL[F, R] = free match { | |
case Free.Pure(x) => k.fold(xToR => ViewL.Done(xToR(x)), f => apply(f(x))) | |
case Free.Eval(fx) => k.fold(xToR => ViewL.Bound(fx, (x: X) => Free.Pure(xToR(x))), f => ViewL.Bound(fx, f)) | |
case b: Free.Bind[F, _, X] => | |
val fw: Free[F, Any] = b.fx.asInstanceOf[Free[F, Any]] | |
val f: Any => Free[F, X] = b.f.asInstanceOf[Any => Free[F, X]] | |
k.fold( | |
xToR => go(fw, K.Wrapped[Free[F, ?], Any, R](w => f(w).asInstanceOf[Free[F, R]])), | |
g => go(fw, K.Wrapped[Free[F, ?], Any, R](w => f(w).flatMap(g)))) | |
} | |
go[R](free, K.Id[Free[F, ?], R]()) | |
} | |
} | |
sealed trait Algebra[F[_], O, R] | |
object Algebra { | |
case class Wrap[F[_], O, R](value: F[R]) extends Algebra[F, O, R] | |
case class Output[F[_], O, R](values: Catenable[O]) extends Algebra[F, O, R] | |
case class Outputs[F[_], O, R](stream: Stream[F, O]) extends Algebra[F, O, R] | |
} | |
final class Pull[F[_], O, R](val algebra: Free[Algebra[F, O, ?], R]) { | |
def flatMap[R2](f: R => Pull[F, O, R2]): Pull[F, O, R2] = | |
Pull(algebra.flatMap { r => f(r).algebra }) | |
def >>[R2](then: Pull[F, O, R2]): Pull[F, O, R2] = flatMap(_ => then) | |
def covary[F2[x] >: F[x]]: Pull[F2, O, R] = this.asInstanceOf[Pull[F2, O, R]] | |
def covaryOutput[O2 >: O]: Pull[F, O2, R] = this.asInstanceOf[Pull[F, O2, R]] | |
def covaryResource[R2 >: R]: Pull[F, O, R2] = this.asInstanceOf[Pull[F, O, R2]] | |
} | |
object Pull { | |
def apply[F[_], O, R](value: Free[Algebra[F, O, ?], R]): Pull[F, O, R] = new Pull[F, O, R](value) | |
def eval[F[_], O, R](fr: F[R]): Pull[F, O, R] = Pull(Free.Eval[Algebra[F, O, ?], R](Algebra.Wrap(fr))) | |
def output[F[_], O](os: Catenable[O]): Pull[F, O, Unit] = Pull(Free.Eval[Algebra[F, O, ?], Unit](Algebra.Output(os))) | |
def output1[F[_], O](o: O): Pull[F, O, Unit] = output(Catenable.single(o)) | |
def outputs[F[_], O](s: Stream[F, O]): Pull[F, O, Unit] = Pull(Free.Eval[Algebra[F, O, ?], Unit](Algebra.Outputs(s))) | |
def pure[F[_], O, R](r: R): Pull[F, O, R] = Pull[F, O, R](Free.Pure[Algebra[F, O, ?], R](r)) | |
} | |
final class Stream[F[_], O](val pull: Pull[F, O, Unit]) { | |
def covary[F2[x] >: F[x]]: Stream[F2, O] = this.asInstanceOf[Stream[F2, O]] | |
def covaryOutput[O2 >: O]: Stream[F, O2] = this.asInstanceOf[Stream[F, O2]] | |
def flatMap[O2](f: O => Stream[F, O2]): Stream[F, O2] = Stream.fromPull { | |
uncons.flatMap { | |
case None => Pull.pure(()) | |
case Some((hd, tl)) => | |
Pull.outputs(Stream.fromPull(hd.map(f).map(_.pull).toList.foldRight(Pull.pure[F, O2, Unit](()))((o, acc) => o >> acc))) >> tl.flatMap(f).pull | |
} | |
} | |
def >>[O2](then: Stream[F, O2]): Stream[F, O2] = flatMap(_ => then) | |
def uncons[X]: Pull[F, X, Option[(Catenable[O], Stream[F, O])]] = { | |
ViewL[Algebra[F, O, ?], Unit](pull.algebra) match { | |
case done: ViewL.Done[Algebra[F, O, ?], Unit] => | |
Pull.pure(None) | |
case bound: ViewL.Bound[Algebra[F, O, ?], _, Unit] => | |
bound.fx match { | |
case wrap: Algebra.Wrap[F, O, _] => | |
val wrapped: F[Any] = wrap.value.asInstanceOf[F[Any]] | |
val f0 = bound.f.asInstanceOf[Any => Free[Algebra[F, O, ?], Unit]] | |
Pull.eval[F, X, Any](wrapped).flatMap { (x: Any) => | |
Stream.fromPull(Pull(f0(x))).uncons | |
} | |
case Algebra.Output(os) => | |
val f = bound.f.asInstanceOf[Unit => Free[Algebra[F, O, ?], Unit]] | |
Pull.pure(Some((os, Stream.fromPull(Pull(f(())))))) | |
case Algebra.Outputs(s) => | |
val f = bound.f.asInstanceOf[Unit => Free[Algebra[F, O, ?], Unit]] | |
s.uncons.flatMap { | |
case None => Stream.fromPull(Pull(f(()))).uncons | |
case Some((hd, tl)) => | |
Pull.pure(Some((hd, tl >> Stream.fromPull(Pull(f(())))))) | |
} | |
} | |
} | |
} | |
def runFold[A](init: A)(f: (A, O) => A)(implicit F: Monad[F]): F[A] = { | |
def go(acc: A, v: ViewL[Algebra[F, O, ?], Option[(Catenable[O], Stream[F, O])]]): F[A] = v match { | |
case done: ViewL.Done[Algebra[F, O, ?], Option[(Catenable[O], Stream[F, O])]] => | |
done.r match { | |
case None => F.pure(acc) | |
case Some((hd, tl)) => go(hd.toList.foldLeft(acc)(f), ViewL[Algebra[F, O, ?], Option[(Catenable[O], Stream[F, O])]](tl.uncons.algebra)) | |
} | |
case bound: ViewL.Bound[Algebra[F, O, ?], _, Option[(Catenable[O], Stream[F, O])]] => | |
bound.fx match { | |
case wrap: Algebra.Wrap[F, O, _] => | |
val wrapped: F[Any] = wrap.value.asInstanceOf[F[Any]] | |
val g = bound.f.asInstanceOf[Any => Free[Algebra[F, O, ?], Option[(Catenable[O], Stream[F, O])]]] | |
F.flatMap(wrapped) { x => go(acc, ViewL[Algebra[F, O, ?], Option[(Catenable[O], Stream[F, O])]](g(x)))} | |
case Algebra.Output(_) => sys.error("impossible") | |
case Algebra.Outputs(_) => sys.error("impossible") | |
} | |
} | |
go(init, ViewL(uncons.algebra)) | |
} | |
def runLog(implicit F: Monad[F]): F[Vector[O]] = runFold(Vector.empty[O])((acc, o) => acc :+ o) | |
} | |
object Stream { | |
def eval[F[_], O](fo: F[O]): Stream[F, O] = Stream.fromPull(Pull.eval(fo).flatMap(Pull.output1)) | |
def emit[F[_], O](o: O): Stream[F, O] = Stream.fromPull(Pull.output1(o)) | |
def fromPull[F[_], O](pull: Pull[F, O, Unit]): Stream[F, O] = new Stream(pull) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment