Skip to content

Instantly share code, notes, and snippets.

Last active June 5, 2020 08:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Alternative encoding for ZIO stream abstractions
package zio
import{ InputStream, OutputStream }
import java.nio.charset.StandardCharsets
object streaming {
type Pull[-R, +E, +I] = ZIO[R, Option[E], I]
type Push[-R, +E, -I] = I => ZIO[R, E, Unit]
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O]
def example1(is: InputStream): URIO[ZEnv, Unit] =
.run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked)
final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) {
def chunked: ZSink[R, E, Chunk[I], Z] =
ZSink( { case (push, done) => (ZIO.foreach_(_)(push), done) })
final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) {
def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] =
ZStream( {
case ((pull, done), (step, last)) =>
val pipe = pull.foldCauseM(
.fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt),
step(_).foldCauseM(Pull.halt, Pull.emit)
ZIO.ifM(done.get)(Pull.end, pipe)
def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] =
def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ???
def head: ZIO[R, Option[E], I] =
def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] =
(pull <*> sink.process).use {
case (pull, (push, done)) =>
def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go)
final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) {
def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] =
ZTransducer(process.zipWith(transducer.process) {
case ((ls, lo), (rs, ro)) =>
(ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro)))
def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] =
ZSink(process.zipWith(sink.process) {
case ((step, last), (push, done)) =>
(step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done)))
def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] =
ZTransducer( {
case (step, last) =>
((chunk: Chunk[I]) => ZIO.foreach(chunk)(step),
object Pull {
val end: IO[Option[Nothing], Nothing] =
def emit[A](a: A): UIO[A] = ZIO.succeedNow(a)
def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(
object ZSink {
val putStrLn: ZSink[console.Console, Nothing, String, Unit] =
apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit)))
def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] =
new ZSink(process)
def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] =
apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get)))
def head[A]: ZSink[Any, Nothing, A, Option[A]] =
.zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get))
def last[A]: ZSink[Any, Nothing, A, Option[A]] =
apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get)))
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ???
object ZStream {
def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] =
new ZStream(pull)
def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ???
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ???
object ZTransducer {
val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] =
.zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) =>
val di = sep.length
((s: String) =>
l =>
ZIO.effectSuspendTotal {
val cb = ChunkBuilder.make[String]()
var rem = l ++ s
var i = rem.indexOf(sep)
while (i != -1) {
cb += rem.take(i)
rem = rem.drop(i + di)
i = rem.indexOf(sep)
s =>
if (s.isEmpty) None
else Option(Chunk.single(s))
val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] =
succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8)))
def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] =
new ZTransducer(process)
def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] =
succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder
def chunkN[R, E, A](
size: Int,
pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]]
): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] =
.makeManaged(Chunk.empty: Chunk[A])
ref =>
(chunk =>
rem =>
ZIO.effectSuspendTotal {
val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder
if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init)
rem =>
if (rem.isEmpty) ZIO.none
else pad(rem)
def identity[A]: ZTransducer[Any, Nothing, A, A] =
def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] =
apply(ZManaged.succeedNow(step -> last))
Copy link

ajaychandran commented Jun 4, 2020

This re-design addresses 2 separate concerns with the current encoding:

  1. Forced wrapping of values in a Chunk.
  2. Wrapping each chunk in an Option at every step in the pipeline.

The concerns are addressed by

  1. Eliminating the internal chunked encoding. Instead, a Chunk is treated as a normal value and appears in signatures like
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]]
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int]
  1. Separating push and step operations in ZSink and ZTransducer, respectively, from the back-propagation of the final result.
type Push[-R, +E, -I]     = I => ZIO[R, E, Unit]
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O]

ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])])
ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])])

Copy link

ajaychandran commented Jun 4, 2020

Note that ZStream.head is defined as

    def head: ZIO[R, Option[E], I] =

Extracting only the first element is not a stream processing operation. This usecase treats ZStream as a collection.

To run the whole stream and return the head, run the stream with ZSink.head.

Copy link

iravid commented Jun 4, 2020

Thank you for suggesting this! The previous ZStream encoding did not use chunks internally, and it was a conscious decision to extensively embed chunks so that users can enjoy their performance improvements without using them explicitly.

Copy link

I agree that acceptable performance can only be achieved through chunking.
The part I disagree with is that the library has to force this on every stream instead of chunking only when required.

ZStream[R, E, Byte] looks better than ZStream[R, E, Chunk[Byte]] but what utility does it add?

I understand that your are busy and that this may not be convincing enough.
I also know that the streams encoding was changed recently and this must have been an exhausting effort.

If I can come up with some benchmarks for the encodings, would you be open to further this discussion?

I have created a branch with a basic implementation and a test.
This is how the new test for chunkN looks like.

          testM("pads last element to size")(
            checkM(Gen.chunkOfBounded(0, 5)(Gen.chunkOf(Gen.anyInt)))(chunk =>
                .run(ZTransducer.chunkN[Int](3, 0) >>> ZSink.collect[Int].chunked.chunked)
                .map(xs => assert(xs)(equalTo(chunk.flatten.padTo(xs.length, 0))))

Note the double use of chunked combinator on ZSink.collect[Int]. The best part is that the new encoding ensures that no intermediate chunks are created while collecting!

Copy link


Or I could open a WIP pull request?

Copy link

iravid commented Jun 5, 2020

Always happy to see additional approaches! However the use of chunks is non-negotiable at this point. Can be reconsidered for 2.0.

Copy link

Thanks @iravid.
I will continue working on this and bug you ocassionally :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment