Skip to content

Instantly share code, notes, and snippets.

@ajaychandran
Last active June 5, 2020 08:00
Show Gist options
  • Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Save ajaychandran/96008a1d7dae56b62971659607352f0e to your computer and use it in GitHub Desktop.
Alternative encoding for ZIO stream abstractions
package zio
import java.io.{ InputStream, OutputStream }
import java.nio.charset.StandardCharsets
object streaming {
type Pull[-R, +E, +I] = ZIO[R, Option[E], I]
type Push[-R, +E, -I] = I => ZIO[R, E, Unit]
type Step[-R, +E, -I, +O] = I => ZIO[R, E, O]
def example1(is: InputStream): URIO[ZEnv, Unit] =
ZStream
.fromInputStream(is)
.run(ZTransducer.utf8Decode >>> ZTransducer.newLines >>> ZSink.putStrLn.chunked)
final class ZSink[-R, +E, -I, +Z](val process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]) {
def chunked: ZSink[R, E, Chunk[I], Z] =
ZSink(process.map { case (push, done) => (ZIO.foreach_(_)(push), done) })
}
final class ZStream[-R, +E, +I](val pull: URManaged[R, Pull[R, E, I]]) {
def >>>[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] =
ZStream(pull.zip(ZRef.makeManaged(false)).zipWith(transducer.process) {
case ((pull, done), (step, last)) =>
val pipe = pull.foldCauseM(
Cause
.sequenceCauseOption(_)
.fold(last.foldCauseM(Pull.halt, _.fold[Pull[Any, E, A]](Pull.end)(done.set(true).as(_))))(Pull.halt),
step(_).foldCauseM(Pull.halt, Pull.emit)
)
ZIO.ifM(done.get)(Pull.end, pipe)
})
def aggregate[R1 <: R, E1 >: E, I1 >: I, A](transducer: ZTransducer[R1, E1, I1, A]): ZStream[R1, E1, A] =
>>>(transducer)
def flatMap[R1 <: R, E1 >: E, A](f: I => ZStream[R1, E1, A]): ZStream[R1, E1, A] = ???
def head: ZIO[R, Option[E], I] =
pull.use(identity)
def run[R1 <: R, E1 >: E, I1 >: I, Z](sink: ZSink[R1, E1, I1, Z]): ZIO[R1, E1, Z] =
(pull <*> sink.process).use {
case (pull, (push, done)) =>
def go: ZIO[R1, E1, Z] = pull.foldCauseM(Cause.sequenceCauseOption(_).fold(done)(ZIO.halt(_)), push(_) *> go)
go
}
}
final class ZTransducer[-R, +E, -I, +O](val process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]) {
def >>>[R1 <: R, E1 >: E, A](transducer: ZTransducer[R1, E1, O, A]): ZTransducer[R1, E1, I, A] =
ZTransducer(process.zipWith(transducer.process) {
case ((ls, lo), (rs, ro)) =>
(ls.andThen(_.flatMap(rs)), lo.flatMap(_.fold(ro)(rs(_) *> ro)))
})
def >>>[R1 <: R, E1 >: E, Z](sink: ZSink[R1, E1, O, Z]): ZSink[R1, E1, I, Z] =
ZSink(process.zipWith(sink.process) {
case ((step, last), (push, done)) =>
(step.andThen(_.flatMap(push)), last.flatMap(_.fold(done)(push(_) *> done)))
})
def chunked: ZTransducer[R, E, Chunk[I], Chunk[O]] =
ZTransducer(process.map {
case (step, last) =>
((chunk: Chunk[I]) => ZIO.foreach(chunk)(step), last.map(_.map(Chunk.single)))
})
}
object Pull {
val end: IO[Option[Nothing], Nothing] = ZIO.fail(None)
def emit[A](a: A): UIO[A] = ZIO.succeedNow(a)
def halt[E](cause: Cause[E]): IO[Option[E], Nothing] = ZIO.halt(cause.map(Option.apply))
}
object ZSink {
val putStrLn: ZSink[console.Console, Nothing, String, Unit] =
apply(ZManaged.succeedNow((console.putStrLn(_), ZIO.unit)))
def apply[R, E, I, Z](process: URManaged[R, (Push[R, E, I], ZIO[R, E, Z])]): ZSink[R, E, I, Z] =
new ZSink(process)
def foldLeftM[R, E, I, Z](z: Z)(f: (I, Z) => ZIO[R, E, Z]): ZSink[R, E, I, Z] =
apply(ZRef.makeManaged(z).map(ref => ((i: I) => ref.get.flatMap(f(i, _).flatMap(ref.set)), ref.get)))
def head[A]: ZSink[Any, Nothing, A, Option[A]] =
apply(
ZRef
.makeManaged(Option.empty[A])
.zipWith(ZRef.makeManaged(true))((ref, empty) => (a => ref.set(Some(a)).whenM(empty.get), ref.get))
)
def last[A]: ZSink[Any, Nothing, A, Option[A]] =
apply(ZRef.makeManaged(Option.empty[A]).map(ref => (a => ref.set(Some(a)), ref.get)))
def toOutputStream(os: OutputStream): ZSink[Any, Nothing, Chunk[Byte], Int] = ???
}
object ZStream {
def apply[R, E, I](pull: URManaged[R, Pull[R, E, I]]): ZStream[R, E, I] =
new ZStream(pull)
def fromChunk[I](chunk: Chunk[I]): ZStream[Any, Nothing, I] = ???
def fromInputStream(is: InputStream): ZStream[Any, Nothing, Chunk[Byte]] = ???
}
object ZTransducer {
val newLines: ZTransducer[system.System, Nothing, String, Chunk[String]] =
apply(
ZRef
.makeManaged("")
.zipWith(ZIO.accessM[system.System](_.get.lineSeparator).toManaged_) { (ref, sep) =>
val di = sep.length
((s: String) =>
ref.get.flatMap(
l =>
ZIO.effectSuspendTotal {
val cb = ChunkBuilder.make[String]()
var rem = l ++ s
var i = rem.indexOf(sep)
while (i != -1) {
cb += rem.take(i)
rem = rem.drop(i + di)
i = rem.indexOf(sep)
}
ref.set(rem).as(cb.result())
}
),
ref
.getAndSet("")
.map(
s =>
if (s.isEmpty) None
else Option(Chunk.single(s))
))
}
)
val utf8Decode: ZTransducer[Any, Nothing, Chunk[Byte], String] =
succeed(chunk => ZIO.succeedNow(new String(chunk.toArray, StandardCharsets.UTF_8)))
def apply[R, E, I, O](process: URManaged[R, (Step[R, E, I, O], ZIO[R, E, Option[O]])]): ZTransducer[R, E, I, O] =
new ZTransducer(process)
def chunkLimit[A](max: Int): ZTransducer[Any, Nothing, Chunk[A], Chunk[Chunk[A]]] =
succeed(chunk => ZIO.succeedNow(Chunk.fromIterable(chunk.grouped(max).toList))) // TODO use ChunkBuilder
def chunkN[R, E, A](
size: Int,
pad: Chunk[A] => ZIO[R, E, Option[Chunk[Chunk[A]]]]
): ZTransducer[R, E, Chunk[A], Chunk[Chunk[A]]] =
apply(
ZRef
.makeManaged(Chunk.empty: Chunk[A])
.map(
ref =>
(chunk =>
ref.get.flatMap(
rem =>
ZIO.effectSuspendTotal {
val cs = Chunk.fromIterable((rem ++ chunk).grouped(size).toList) // TODO use ChunkBuilder
if (cs.last.length == size) ref.set(Chunk.empty).as(cs) else ref.set(cs.last).as(cs.init)
}
),
ref
.getAndSet(Chunk.empty)
.flatMap(
rem =>
if (rem.isEmpty) ZIO.none
else pad(rem)
))
)
)
def identity[A]: ZTransducer[Any, Nothing, A, A] =
succeed(ZIO.succeedNow)
def succeed[R, E, I, O](step: Step[R, E, I, O], last: ZIO[R, E, Option[O]] = ZIO.none): ZTransducer[R, E, I, O] =
apply(ZManaged.succeedNow(step -> last))
}
}
@ajaychandran
Copy link
Author

Thanks @iravid.
I will continue working on this and bug you ocassionally :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment