Last active
June 5, 2020 08:00
-
-
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Alternative encoding for ZIO stream abstractions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package zio | |
import java.io.{ InputStream, OutputStream } | |
import java.nio.charset.StandardCharsets | |
object streaming { | |
type Pull[-R, +E, +I] = ZIO[R, Option[E], I] | |
type Push[-R, +E, -I] = I => ZIO[R, E, Unit] | |
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O] | |
def example1(is: InputStream): URIO[ZEnv, Unit] = | |
ZStream | |
.fromInputStream(is) | |
.run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked) | |
final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) { | |
def chunked: ZSink[R, E, Chunk[I], Z] = | |
ZSink(process.map { case (push, done) => (ZIO.foreach_(_)(push), done) }) | |
} | |
final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) { | |
def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
ZStream(pull.zip(ZRef.makeManaged(false)).zipWith(transducer.process) { | |
case ((pull, done), (step, last)) => | |
val pipe = pull.foldCauseM( | |
Cause | |
.sequenceCauseOption(_) | |
.fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt), | |
step(_).foldCauseM(Pull.halt, Pull.emit) | |
) | |
ZIO.ifM(done.get)(Pull.end, pipe) | |
}) | |
def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] = | |
>>>(transducer) | |
def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ??? | |
def head: ZIO[R, Option[E], I] = | |
pull.use(identity) | |
def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] = | |
(pull <*> sink.process).use { | |
case (pull, (push, done)) => | |
def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go) | |
go | |
} | |
} | |
final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) { | |
def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] = | |
ZTransducer(process.zipWith(transducer.process) { | |
case ((ls, lo), (rs, ro)) => | |
(ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro))) | |
}) | |
def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] = | |
ZSink(process.zipWith(sink.process) { | |
case ((step, last), (push, done)) => | |
(step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done))) | |
}) | |
def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] = | |
ZTransducer(process.map { | |
case (step, last) => | |
((chunk: Chunk[I]) => ZIO.foreach(chunk)(step), last.map(_.map(Chunk.single))) | |
}) | |
} | |
object Pull { | |
val end: IO[Option[Nothing], Nothing] = ZIO.fail(None) | |
def emit[A](a: A): UIO[A] = ZIO.succeedNow(a) | |
def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(cause.map(Option.apply)) | |
} | |
object ZSink { | |
val putStrLn: ZSink[console.Console, Nothing, String, Unit] = | |
apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit))) | |
def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] = | |
new ZSink(process) | |
def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] = | |
apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get))) | |
def head[A]: ZSink[Any, Nothing, A, Option[A]] = | |
apply( | |
ZRef | |
.makeManaged(Option.empty[A]) | |
.zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get)) | |
) | |
def last[A]: ZSink[Any, Nothing, A, Option[A]] = | |
apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get))) | |
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ??? | |
} | |
object ZStream { | |
def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] = | |
new ZStream(pull) | |
def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ??? | |
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ??? | |
} | |
object ZTransducer { | |
val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] = | |
apply( | |
ZRef | |
.makeManaged("") | |
.zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) => | |
val di = sep.length | |
((s: String) => | |
ref.get.flatMap( | |
l => | |
ZIO.effectSuspendTotal { | |
val cb = ChunkBuilder.make[String]() | |
var rem = l ++ s | |
var i = rem.indexOf(sep) | |
while (i != -1) { | |
cb += rem.take(i) | |
rem = rem.drop(i + di) | |
i = rem.indexOf(sep) | |
} | |
ref.set(rem).as(cb.result()) | |
} | |
), | |
ref | |
.getAndSet("") | |
.map( | |
s => | |
if (s.isEmpty) None | |
else Option(Chunk.single(s)) | |
)) | |
} | |
) | |
val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] = | |
succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8))) | |
def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] = | |
new ZTransducer(process) | |
def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] = | |
succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder | |
def chunkN[R, E, A]( | |
size: Int, | |
pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]] | |
): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] = | |
apply( | |
ZRef | |
.makeManaged(Chunk.empty: Chunk[A]) | |
.map( | |
ref => | |
(chunk => | |
ref.get.flatMap( | |
rem => | |
ZIO.effectSuspendTotal { | |
val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder | |
if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init) | |
} | |
), | |
ref | |
.getAndSet(Chunk.empty) | |
.flatMap( | |
rem => | |
if (rem.isEmpty) ZIO.none | |
else pad(rem) | |
)) | |
) | |
) | |
def identity[A]: ZTransducer[Any, Nothing, A, A] = | |
succeed(ZIO.succeedNow) | |
def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] = | |
apply(ZManaged.succeedNow(step -> last)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks @iravid.
I will continue working on this and bug you ocassionally :)