Skip to content

Instantly share code, notes, and snippets.

@ajaychandran
Last active June 5, 2020 08:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Alternative encoding for ZIO stream abstractions
package zio
import java.io.{ InputStream, OutputStream }
import java.nio.charset.StandardCharsets
object streaming {
type Pull[-R, +E, +I] = ZIO[R, Option[E], I]
type Push[-R, +E, -I] = I => ZIO[R, E, Unit]
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O]
def example1(is: InputStream): URIO[ZEnv, Unit] =
ZStream
.fromInputStream(is)
.run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked)
final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) {
def chunked: ZSink[R, E, Chunk[I], Z] =
ZSink(process.map { case (push, done) => (ZIO.foreach_(_)(push), done) })
}
final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) {
def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] =
ZStream(pull.zip(ZRef.makeManaged(false)).zipWith(transducer.process) {
case ((pull, done), (step, last)) =>
val pipe = pull.foldCauseM(
Cause
.sequenceCauseOption(_)
.fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt),
step(_).foldCauseM(Pull.halt, Pull.emit)
)
ZIO.ifM(done.get)(Pull.end, pipe)
})
def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] =
>>>(transducer)
def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ???
def head: ZIO[R, Option[E], I] =
pull.use(identity)
def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] =
(pull <*> sink.process).use {
case (pull, (push, done)) =>
def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go)
go
}
}
final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) {
def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] =
ZTransducer(process.zipWith(transducer.process) {
case ((ls, lo), (rs, ro)) =>
(ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro)))
})
def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] =
ZSink(process.zipWith(sink.process) {
case ((step, last), (push, done)) =>
(step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done)))
})
def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] =
ZTransducer(process.map {
case (step, last) =>
((chunk: Chunk[I]) => ZIO.foreach(chunk)(step), last.map(_.map(Chunk.single)))
})
}
object Pull {
val end: IO[Option[Nothing], Nothing] = ZIO.fail(None)
def emit[A](a: A): UIO[A] = ZIO.succeedNow(a)
def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(cause.map(Option.apply))
}
object ZSink {
val putStrLn: ZSink[console.Console, Nothing, String, Unit] =
apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit)))
def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] =
new ZSink(process)
def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] =
apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get)))
def head[A]: ZSink[Any, Nothing, A, Option[A]] =
apply(
ZRef
.makeManaged(Option.empty[A])
.zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get))
)
def last[A]: ZSink[Any, Nothing, A, Option[A]] =
apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get)))
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ???
}
object ZStream {
def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] =
new ZStream(pull)
def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ???
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ???
}
object ZTransducer {
val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] =
apply(
ZRef
.makeManaged("")
.zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) =>
val di = sep.length
((s: String) =>
ref.get.flatMap(
l =>
ZIO.effectSuspendTotal {
val cb = ChunkBuilder.make[String]()
var rem = l ++ s
var i = rem.indexOf(sep)
while (i != -1) {
cb += rem.take(i)
rem = rem.drop(i + di)
i = rem.indexOf(sep)
}
ref.set(rem).as(cb.result())
}
),
ref
.getAndSet("")
.map(
s =>
if (s.isEmpty) None
else Option(Chunk.single(s))
))
}
)
val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] =
succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8)))
def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] =
new ZTransducer(process)
def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] =
succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder
def chunkN[R, E, A](
size: Int,
pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]]
): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] =
apply(
ZRef
.makeManaged(Chunk.empty: Chunk[A])
.map(
ref =>
(chunk =>
ref.get.flatMap(
rem =>
ZIO.effectSuspendTotal {
val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder
if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init)
}
),
ref
.getAndSet(Chunk.empty)
.flatMap(
rem =>
if (rem.isEmpty) ZIO.none
else pad(rem)
))
)
)
def identity[A]: ZTransducer[Any, Nothing, A, A] =
succeed(ZIO.succeedNow)
def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] =
apply(ZManaged.succeedNow(step -> last))
}
}
@ajaychandran
Copy link
Author

ajaychandran commented Jun 4, 2020

This re-design addresses 2 separate concerns with the current encoding:

  1. Forced wrapping of values in a Chunk.
  2. Wrapping each chunk in an Option at every step in the pipeline.

The concerns are addressed by

  1. Eliminating the internal chunked encoding. Instead, a Chunk is treated as a normal value and appears in signatures like
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]]
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int]
  1. Separating push and step operations in ZSink and ZTransducer, respectively, from the back-propagation of the final result.
type Push[-R, +E, -I]     = I => ZIO[R, E, Unit]
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O]

ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])])
ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])])

@ajaychandran
Copy link
Author

ajaychandran commented Jun 4, 2020

Note that ZStream.head is defined as

    def head: ZIO[R, Option[E], I] =
      pull.use(identity)

Extracting only the first element is not a stream processing operation. This usecase treats ZStream as a collection.

To run the whole stream and return the head, run the stream with ZSink.head.

@iravid
Copy link

iravid commented Jun 4, 2020

Thank you for suggesting this! The previous ZStream encoding did not use chunks internally, and it was a conscious decision to extensively embed chunks so that users can enjoy their performance improvements without using them explicitly.

@ajaychandran
Copy link
Author

@iravid
I agree that acceptable performance can only be achieved through chunking.
The part I disagree with is that the library has to force this on every stream instead of chunking only when required.

ZStream[R, E, Byte] looks better than ZStream[R, E, Chunk[Byte]] but what utility does it add?

I understand that your are busy and that this may not be convincing enough.
I also know that the streams encoding was changed recently and this must have been an exhausting effort.

If I can come up with some benchmarks for the encodings, would you be open to further this discussion?

I have created a branch with a basic implementation and a test.
This is how the new test for chunkN looks like.

          testM("pads last element to size")(
            checkM(Gen.chunkOfBounded(0, 5)(Gen.chunkOf(Gen.anyInt)))(chunk =>
              ZStream
                .fromChunk(chunk)
                .run(ZTransducer.chunkN[Int](3, 0) >>> ZSink.collect[Int].chunked.chunked)
                .map(xs => assert(xs)(equalTo(chunk.flatten.padTo(xs.length, 0))))
            )
          )

Note the double use of chunked combinator on ZSink.collect[Int]. The best part is that the new encoding ensures that no intermediate chunks are created while collecting!

@ajaychandran
Copy link
Author

@iravid

Or I could open a WIP pull request?

@iravid
Copy link

iravid commented Jun 5, 2020

Always happy to see additional approaches! However the use of chunks is non-negotiable at this point. Can be reconsidered for 2.0.

@ajaychandran
Copy link
Author

Thanks @iravid.
I will continue working on this and bug you ocassionally :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment