All right, here we go! The API is quite a bit different than what we have currently, but it's simpler to use, more general, and the implementation can be much more efficient. The encoding for Tee
, Process1
, Channel
is greatly simplified, these become just regular functions, rather than a Process[F,_]
with a funky F
. Stepping a Process
is now a first-class concept, exposed publicly, in a safe way. The resulting style looks a lot like ordinary list processing, and doing things like a 3-way or N-way merge are trivial.
The algebra and the canonical model for this algebra are given in the attached streams.scala
. The algebra for streams is given by the trait Stream[P[+_[_],+_]]
, and there's a canonical instance, Stream[NF]
, which specifies what the operations mean. A couple notes:
- The
Process[F,A]
data type we have currently would have aStream
instance. (Name ofStream
TBD) - The
Chunk
type is TBD Free
is just the free monad (well, one formulation of it)- I still have some uncertainty about how to specify the meaning of interrupts in conjunction with the
bracket
operation. Intuitively, we'd like interrupts to halt the running process ASAP, while still ensuring any finalizers introduced bybracket
are run exactly once. - This is just the algebra and a specification---figuring out a more efficient implementation, making the syntax nice in Scala, and finding a way to not break too much user code with these changes is all TBD.
In this model, Tee
, Channel
, etc aren't special. We just write a regular function that works over something with a Stream
instance. So here's some sample code you'll be able to write, assuming Process
has a Stream
instance:
def zipWith[F[_],A,B,C](p: Process[F,A], p2: Process[F,B])(f: (A,B) => C): Process[F,C] =
for {
a <- p.await
b <- p2.await
tl <- Process.emit(f(a.head, b.head)) ++ zipWith(a.tail, b.tail)(f)
} yield tl
Very straightforward style, almost like working with ordinary lists. If you want to work in chunks, you can use p.available
instead of p.await
there.
Extending this a bit, we could allow asynchronous awaits:
def zipWith[F[_],A,B,C](p: Process[F,A], p2: Process[F,B])(f: (A,B) => C): Process[F,C] =
for {
a <- p.awaitAsync
b <- p2.awaitAsync
tl <- Process.emit(f(a.head, b.head)) ++ zipWith(a.tail, b.tail)(f)
} yield tl
The semantics of this would be that a
and b
are backed by a running Future
of some sort. A call to .head
or .tail
forces the Future
. We may want to only allow these awaitAsync
calls when F
is suitably constrained (perhaps if F
is Task
).
The existence of awaitAsync
/ availableAsync
means these functions subsume everything that can be done using Wye
, though we have much more flexibility (no need to combine streams two at a time).
We then have:
// Scala can't represent these types as is, but you get the idea
type Process1[-A,+B] = forall F . Process[F,A] => Process[F,B]
type Tee[-A,-B,+C] = forall F . (Process[F,A], Process[F,A]) => Process[F,B]
type Wye[-A,-B,+C] = forall F : Async . (Process[F,A], Process[F,B]) => Process[F,C]
type Channel[F[_],-A,+B] = Process[F,A] => Process[F,B]
We can still have all the same functions as before and keep the overall organization of the library. We can give Stream
instances for these types (they will act on the output Process
, like you would expect), but users can also just program directly with regular functions that accept and return Process
values.
Other notes:
- The actual
Stream[Process]
instance will stil have to special caseappend
andflatMap
to avoid quadratic performance - We should probably add a
translate
functionProcess[F,A] => Process[G,A]
, like we have now.