-
-
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
package zio | |
import java.io.{ InputStream, OutputStream } | |
import java.nio.charset.StandardCharsets | |
object streaming { | |
type Pull[-R, +E, +I] = ZIO[R, Option[E], I] | |
type Push[-R, +E, -I] = I => ZIO[R, E, Unit] | |
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O] | |
def example1(is: InputStream): URIO[ZEnv, Unit] = | |
ZStream | |
.fromInputStream(is) | |
.run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked) | |
final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) { | |
def chunked: ZSink[R, E, Chunk[I], Z] = | |
ZSink(process.map { case (push, done) => (ZIO.foreach_(_)(push), done) }) | |
} | |
final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) { | |
def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
ZStream(pull.zip(ZRef.makeManaged(false)).zipWith(transducer.process) { | |
case ((pull, done), (step, last)) => | |
val pipe = pull.foldCauseM( | |
Cause | |
.sequenceCauseOption(_) | |
.fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt), | |
step(_).foldCauseM(Pull.halt, Pull.emit) | |
) | |
ZIO.ifM(done.get)(Pull.end, pipe) | |
}) | |
def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
>>>(transducer) | |
def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ??? | |
def head: ZIO[R, Option[E], I] = | |
pull.use(identity) | |
def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] = | |
(pull <*> sink.process).use { | |
case (pull, (push, done)) => | |
def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go) | |
go | |
} | |
} | |
final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) { | |
def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] = | |
ZTransducer(process.zipWith(transducer.process) { | |
case ((ls, lo), (rs, ro)) => | |
(ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro))) | |
}) | |
def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] = | |
ZSink(process.zipWith(sink.process) { | |
case ((step, last), (push, done)) => | |
(step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done))) | |
}) | |
def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] = | |
ZTransducer(process.map { | |
case (step, last) => | |
((chunk: Chunk[I]) => ZIO.foreach(chunk)(step), last.map(_.map(Chunk.single))) | |
}) | |
} | |
object Pull { | |
val end: IO[Option[Nothing], Nothing] = ZIO.fail(None) | |
def emit[A](a: A): UIO[A] = ZIO.succeedNow(a) | |
def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(cause.map(Option.apply)) | |
} | |
object ZSink { | |
val putStrLn: ZSink[console.Console, Nothing, String, Unit] = | |
apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit))) | |
def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] = | |
new ZSink(process) | |
def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] = | |
apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get))) | |
def head[A]: ZSink[Any, Nothing, A, Option[A]] = | |
apply( | |
ZRef | |
.makeManaged(Option.empty[A]) | |
.zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get)) | |
) | |
def last[A]: ZSink[Any, Nothing, A, Option[A]] = | |
apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get))) | |
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ??? | |
} | |
object ZStream { | |
def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] = | |
new ZStream(pull) | |
def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ??? | |
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ??? | |
} | |
object ZTransducer { | |
val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] = | |
apply( | |
ZRef | |
.makeManaged("") | |
.zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) => | |
val di = sep.length | |
((s: String) => | |
ref.get.flatMap( | |
l => | |
ZIO.effectSuspendTotal { | |
val cb = ChunkBuilder.make[String]() | |
var rem = l ++ s | |
var i = rem.indexOf(sep) | |
while (i != -1) { | |
cb += rem.take(i) | |
rem = rem.drop(i + di) | |
i = rem.indexOf(sep) | |
} | |
ref.set(rem).as(cb.result()) | |
} | |
), | |
ref | |
.getAndSet("") | |
.map( | |
s => | |
if (s.isEmpty) None | |
else Option(Chunk.single(s)) | |
)) | |
} | |
) | |
val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] = | |
succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8))) | |
def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] = | |
new ZTransducer(process) | |
def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] = | |
succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder | |
def chunkN[R, E, A]( | |
size: Int, | |
pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]] | |
): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] = | |
apply( | |
ZRef | |
.makeManaged(Chunk.empty: Chunk[A]) | |
.map( | |
ref => | |
(chunk => | |
ref.get.flatMap( | |
rem => | |
ZIO.effectSuspendTotal { | |
val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder | |
if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init) | |
} | |
), | |
ref | |
.getAndSet(Chunk.empty) | |
.flatMap( | |
rem => | |
if (rem.isEmpty) ZIO.none | |
else pad(rem) | |
)) | |
) | |
) | |
def identity[A]: ZTransducer[Any, Nothing, A, A] = | |
succeed(ZIO.succeedNow) | |
def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] = | |
apply(ZManaged.succeedNow(step -> last)) | |
} | |
} |
Note that ZStream.head
is defined as
def head: ZIO[R, Option[E], I] =
pull.use(identity)
Extracting only the first element is not a stream processing operation. This usecase treats ZStream
as a collection.
To run the whole stream and return the head, run
the stream with ZSink.head
.
Thank you for suggesting this! The previous ZStream encoding did not use chunks internally, and it was a conscious decision to extensively embed chunks so that users can enjoy their performance improvements without using them explicitly.
@iravid
I agree that acceptable performance can only be achieved through chunking.
The part I disagree with is that the library has to force this on every stream instead of chunking only when required.
ZStream[R, E, Byte]
looks better than ZStream[R, E, Chunk[Byte]]
but what utility does it add?
I understand that your are busy and that this may not be convincing enough.
I also know that the streams encoding was changed recently and this must have been an exhausting effort.
If I can come up with some benchmarks for the encodings, would you be open to further this discussion?
I have created a branch with a basic implementation and a test.
This is how the new test for chunkN
looks like.
testM("pads last element to size")(
checkM(Gen.chunkOfBounded(0, 5)(Gen.chunkOf(Gen.anyInt)))(chunk =>
ZStream
.fromChunk(chunk)
.run(ZTransducer.chunkN[Int](3, 0) >>> ZSink.collect[Int].chunked.chunked)
.map(xs => assert(xs)(equalTo(chunk.flatten.padTo(xs.length, 0))))
)
)
Note the double use of chunked
combinator on ZSink.collect[Int]
. The best part is that the new encoding ensures that no intermediate chunks are created while collecting!
Or I could open a WIP pull request?
Always happy to see additional approaches! However the use of chunks is non-negotiable at this point. Can be reconsidered for 2.0.
Thanks @iravid.
I will continue working on this and bug you ocassionally :)
This re-design addresses 2 separate concerns with the current encoding:
Chunk
.Option
at every step in the pipeline.The concerns are addressed by
Chunk
is treated as a normal value and appears in signatures likeZSink
andZTransducer
, respectively, from the back-propagation of the final result.