Last active
June 5, 2020 08:00
-
-
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Alternative encoding for ZIO stream abstractions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zio | |
import java.io.{ InputStream, OutputStream } | |
import java.nio.charset.StandardCharsets | |
object streaming { | |
type Pull[-R, +E, +I] = ZIO[R, Option[E], I] | |
type Push[-R, +E, -I] = I => ZIO[R, E, Unit] | |
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O] | |
def example1(is: InputStream): URIO[ZEnv, Unit] = | |
ZStream | |
.fromInputStream(is) | |
.run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked) | |
final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) { | |
def chunked: ZSink[R, E, Chunk[I], Z] = | |
ZSink(process.map { case (push, done) => (ZIO.foreach_(_)(push), done) }) | |
} | |
final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) { | |
def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
ZStream(pull.zip(ZRef.makeManaged(false)).zipWith(transducer.process) { | |
case ((pull, done), (step, last)) => | |
val pipe = pull.foldCauseM( | |
Cause | |
.sequenceCauseOption(_) | |
.fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt), | |
step(_).foldCauseM(Pull.halt, Pull.emit) | |
) | |
ZIO.ifM(done.get)(Pull.end, pipe) | |
}) | |
def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
>>>(transducer) | |
def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ??? | |
def head: ZIO[R, Option[E], I] = | |
pull.use(identity) | |
def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] = | |
(pull <*> sink.process).use { | |
case (pull, (push, done)) => | |
def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go) | |
go | |
} | |
} | |
final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) { | |
def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] = | |
ZTransducer(process.zipWith(transducer.process) { | |
case ((ls, lo), (rs, ro)) => | |
(ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro))) | |
}) | |
def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] = | |
ZSink(process.zipWith(sink.process) { | |
case ((step, last), (push, done)) => | |
(step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done))) | |
}) | |
def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] = | |
ZTransducer(process.map { | |
case (step, last) => | |
((chunk: Chunk[I]) => ZIO.foreach(chunk)(step), last.map(_.map(Chunk.single))) | |
}) | |
} | |
object Pull { | |
val end: IO[Option[Nothing], Nothing] = ZIO.fail(None) | |
def emit[A](a: A): UIO[A] = ZIO.succeedNow(a) | |
def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(cause.map(Option.apply)) | |
} | |
object ZSink { | |
val putStrLn: ZSink[console.Console, Nothing, String, Unit] = | |
apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit))) | |
def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] = | |
new ZSink(process) | |
def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] = | |
apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get))) | |
def head[A]: ZSink[Any, Nothing, A, Option[A]] = | |
apply( | |
ZRef | |
.makeManaged(Option.empty[A]) | |
.zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get)) | |
) | |
def last[A]: ZSink[Any, Nothing, A, Option[A]] = | |
apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get))) | |
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ??? | |
} | |
object ZStream { | |
def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] = | |
new ZStream(pull) | |
def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ??? | |
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ??? | |
} | |
object ZTransducer { | |
val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] = | |
apply( | |
ZRef | |
.makeManaged("") | |
.zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) => | |
val di = sep.length | |
((s: String) => | |
ref.get.flatMap( | |
l => | |
ZIO.effectSuspendTotal { | |
val cb = ChunkBuilder.make[String]() | |
var rem = l ++ s | |
var i = rem.indexOf(sep) | |
while (i != -1) { | |
cb += rem.take(i) | |
rem = rem.drop(i + di) | |
i = rem.indexOf(sep) | |
} | |
ref.set(rem).as(cb.result()) | |
} | |
), | |
ref | |
.getAndSet("") | |
.map( | |
s => | |
if (s.isEmpty) None | |
else Option(Chunk.single(s)) | |
)) | |
} | |
) | |
val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] = | |
succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8))) | |
def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] = | |
new ZTransducer(process) | |
def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] = | |
succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder | |
def chunkN[R, E, A]( | |
size: Int, | |
pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]] | |
): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] = | |
apply( | |
ZRef | |
.makeManaged(Chunk.empty: Chunk[A]) | |
.map( | |
ref => | |
(chunk => | |
ref.get.flatMap( | |
rem => | |
ZIO.effectSuspendTotal { | |
val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder | |
if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init) | |
} | |
), | |
ref | |
.getAndSet(Chunk.empty) | |
.flatMap( | |
rem => | |
if (rem.isEmpty) ZIO.none | |
else pad(rem) | |
)) | |
) | |
) | |
def identity[A]: ZTransducer[Any, Nothing, A, A] = | |
succeed(ZIO.succeedNow) | |
def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] = | |
apply(ZManaged.succeedNow(step -> last)) | |
} | |
} |
Or I could open a WIP pull request?
Always happy to see additional approaches! However the use of chunks is non-negotiable at this point. Can be reconsidered for 2.0.
Thanks @iravid.
I will continue working on this and bug you ocassionally :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@iravid
I agree that acceptable performance can only be achieved through chunking.
The part I disagree with is that the library has to force this on every stream instead of chunking only when required.
ZStream[R, E, Byte]
looks better thanZStream[R, E, Chunk[Byte]]
but what utility does it add?I understand that your are busy and that this may not be convincing enough.
I also know that the streams encoding was changed recently and this must have been an exhausting effort.
If I can come up with some benchmarks for the encodings, would you be open to further this discussion?
I have created a branch with a basic implementation and a test.
This is how the new test for
chunkN
looks like.Note the double use of
chunked
combinator onZSink.collect[Int]
. The best part is that the new encoding ensures that no intermediate chunks are created while collecting!