Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Last active August 29, 2015 14:22
Show Gist options
  • Save pchiusano/2f743403e40c8edb847d to your computer and use it in GitHub Desktop.
Save pchiusano/2f743403e40c8edb847d to your computer and use it in GitHub Desktop.
WIP design for new scalaz-stream core

All right, here we go! The API is quite a bit different than what we have currently, but it's simpler to use, more general, and the implementation can be much more efficient. The encoding for Tee, Process1, Channel is greatly simplified, these become just regular functions, rather than a Process[F,_] with a funky F. Stepping a Process is now a first-class concept, exposed publicly, in a safe way. The resulting style looks a lot like ordinary list processing, and doing things like a 3-way or N-way merge are trivial.

The algebra and the canonical model for this algebra are given in the attached streams.scala. The algebra for streams is given by the trait Stream[P[+_[_],+_]], and there's a canonical instance, Stream[NF], which specifies what the operations mean. A couple notes:

  • The Process[F,A] data type we have currently would have a Stream instance. (Name of Stream TBD)
  • The Chunk type is TBD
  • Free is just the free monad (well, one formulation of it)
  • I still have some uncertainty about how to specify the meaning of interrupts in conjunction with the bracket operation. Intuitively, we'd like interrupts to halt the running process ASAP, while still ensuring any finalizers introduced by bracket are run exactly once.
  • This is just the algebra and a specification---figuring out a more efficient implementation, making the syntax nice in Scala, and finding a way to not break too much user code with these changes is all TBD.

Programming style

In this model, Tee, Channel, etc aren't special. We just write a regular function that works over something with a Stream instance. So here's some sample code you'll be able to write, assuming Process has a Stream instance:

def zipWith[F[_],A,B,C](p: Process[F,A], p2: Process[F,B])(f: (A,B) => C): Process[F,C] =
  for {
    a <- p.await
    b <- p2.await
    tl <- Process.emit(f(a.head, b.head)) ++ zipWith(a.tail, b.tail)(f)
  } yield tl

Very straightforward style, almost like working with ordinary lists. If you want to work in chunks, you can use p.available instead of p.await there.

Extending this a bit, we could allow asynchronous awaits:

def zipWith[F[_],A,B,C](p: Process[F,A], p2: Process[F,B])(f: (A,B) => C): Process[F,C] =
  for {
    a <- p.awaitAsync
    b <- p2.awaitAsync
    tl <- Process.emit(f(a.head, b.head)) ++ zipWith(a.tail, b.tail)(f)
  } yield tl

The semantics of this would be that a and b are backed by a running Future of some sort. A call to .head or .tail forces the Future. We may want to only allow these awaitAsync calls when F is suitably constrained (perhaps if F is Task).

The existence of awaitAsync / availableAsync means these functions subsume everything that can be done using Wye, though we have much more flexibility (no need to combine streams two at a time).

We then have:

// Scala can't represent these types as is, but you get the idea
type Process1[-A,+B] = forall F . Process[F,A] => Process[F,B]
type Tee[-A,-B,+C] = forall F . (Process[F,A], Process[F,A]) => Process[F,B]
type Wye[-A,-B,+C] = forall F : Async . (Process[F,A], Process[F,B]) => Process[F,C]
type Channel[F[_],-A,+B] = Process[F,A] => Process[F,B]

We can still have all the same functions as before and keep the overall organization of the library. We can give Stream instances for these types (they will act on the output Process, like you would expect), but users can also just program directly with regular functions that accept and return Process values.

Other notes:

  • The actual Stream[Process] instance will stil have to special case append and flatMap to avoid quadratic performance
  • We should probably add a translate function Process[F,A] => Process[G,A], like we have now.
/*
`P` denotes a `NF` (normal form) value. For a `Stream` instance, `S`,
to be valid, it must be the case that:
NF(S.append(p1, p2)) == NF.append(NF(p1), NF(p2))
NF(S.flatMap(p)(f)) == NF.flatMap(NF(p))(f andThen NF)
(Notation - `NF(p)` converts `p` to its `NF` equivalent.)
... and so on for each operation of `Stream`. That is, the operations
are implemented in a way that is consistent with the implementation
for denotations.
*/
trait Stream[P[+_[_],+_]] {
def emits[F[_],A](as: Chunk[A]): P[F,A]
def emit[F[_],A](a: A): P[F,A] = emits(Chunk.singleton(a))
def empty[A]: P[Nothing,A] = emits(Chunk.empty)
def append[F[_],A](a: P[F,A], b: => P[F,A]): P[F,A]
def flatMap[F[_],A,B](a: P[F,A])(f: A => P[F,B]): P[F,B]
def available[F[_],A](p: P[F, A]): P[F, Step[Chunk[A], P[F,A]]]
def await[F[_],A](p: P[F,A]): P[F, Step[A, P[F,A]]] = flatMap(available(p)) { step =>
Chunk.uncons(step.head) match {
case None => empty
case Some((hd,tl)) => emit { Step(hd, append(emits(tl), step.tail)) }
}
}
def fail[F[_],A](e: Throwable): P[F,A]
def onError[F[_],A](p: P[F,A])(handle: Throwable => P[F,A]): P[F,A]
def bracket[F[_],R,A](acquire: P[F,R])(use: R => P[F,A], release: R => P[F,Unit]): P[F,A]
def free[F[_],A](fa: Free[F,P[F,A]]): P[F,A]
def eval[F[_],A](fa: F[A]): P[F,A] = free(Free.Eval(fa) flatMap (a => Free.Pure(emit(a))))
}
/**
* Canonical implementation of `Stream`. Not efficient enough, but any
* more efficient implementation must still be consistent with this one.
*/
trait NF[F[_],+A]
object NF extends Stream[NF] {
case class Emits[F[_],A](c: Chunk[A]) extends NF[F,A]
case class Cons[F[_],A](hd: Chunk[A], a2: () => NF[F,A]) extends NF[F,A]
case class Fail[F[_],A](err: Throwable) extends NF[F,A]
case class Await[F[_],A](f: Free[F,NF[F,A]]) extends NF[F,A]
def emits[F[_],A](c: Chunk[A]): NF[F,A] = Emits(c)
def fail[F[_],A](e: Throwable): NF[F,A] = Fail(e)
def free[F[_],A](f: Free[F,NF[F,A]]): NF[F,A] = Await(f)
def flatMap[F[_],A,B](a: NF[F,A])(f: A => NF[F,B]): NF[F,B] = a match {
case Fail(e) => Fail(e)
case Emits(c) => Chunk.uncons(c) match {
case None => emits(Chunk.empty)
case Some((hd,tl)) => append(f(hd), flatMap[F,A,B](emits(tl))(f))
}
case Cons(h, t) => append(flatMap[F,A,B](emits(h))(f), flatMap(t())(f))
case Await(g) => Await(g map (a => flatMap(a)(f)))
}
def onError[F[_],A](a: NF[F,A])(handle: Throwable => NF[F,A]): NF[F,A] = a match {
case Fail(e) => handle(e)
case Emits(_) => a
case Await(g) => Await(g map (a => onError(a)(handle)))
case Cons(h, t) => Cons(h, () => onError(t())(handle))
}
def mask[F[_],A](a: NF[F,A]): NF[F,A] =
onError(a)(e => emits(Chunk.empty))
def available[F[_],A](a: NF[F,A]): NF[F, Step[Chunk[A], NF[F,A]]] = a match {
case Fail(e) => Fail(e)
case Emits(c) => Emits(Chunk.singleton(Step(c, Emits(Chunk.empty))))
case Await(f) => Await(f map available)
case Cons(h, t) => Emits(Chunk.singleton(Step(h, t())))
}
def drain[F[_],A,B](a: NF[F,A]): NF[F,B] = a match {
case Fail(e) => Fail(e)
case Emits(_) => Emits(Chunk.empty)
case Await(g) => Await(g map drain)
case Cons(_, t) => Cons(Chunk.empty, () => drain(t()))
}
def bracket[F[_],R,A](acquire: NF[F,R])(use: R => NF[F,A], release: R => NF[F,Unit]): NF[F,A] =
flatMap(acquire) { r =>
val cleanup = drain[F,Unit,A](mask(release(r)))
onError(append(use(r), cleanup)) { e => append(cleanup, fail(e)) }
}
def append[F[_],A](a1: NF[F,A], a2: => NF[F,A]): NF[F,A] = a1 match {
case Emits(c) => Cons(c, () => a2)
case Fail(e) => Fail(e)
case Cons(h,t) => Cons(h, () => append(t(),a2))
case Await(f) => Await(f map (a1 => append(a1, a2)))
}
}
/** A formulation of the free monad. */
trait Free[+F[_],+A] {
import Free._
def flatMap[F2[x]>:F[x],B](f: A => Free[F2,B]): Free[F2,B] = Bind(this, f)
def map[B](f: A => B): Free[F,B] = Bind(this, f andThen (Free.Pure(_)))
}
object Free {
case class Pure[A](a: A) extends Free[Nothing,A]
case class Eval[F[_],A](fa: F[A]) extends Free[F,A]
case class Bind[F[_],R,A](r: Free[F,R], f: R => Free[F,A]) extends Free[F,A]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment