Created
December 17, 2021 20:47
-
-
Save calvinlfer/a5331d5f659cea61ff612b1b74449fe0 to your computer and use it in GitHub Desktop.
Zymposium part 1
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{BufferedReader, FileReader} | |
final case class CStream[-R, +E, +A]( | |
private val process: ZManaged[R, Nothing, ZIO[R, Option[E], Chunk[A]]] | |
) { self => | |
def take(n: Int): CStream[R, E, A] = { | |
def go(rem: Ref[Int], pull: ZIO[R, Option[E], Chunk[A]]): ZIO[R, Option[E], Chunk[A]] = | |
rem.get.flatMap { | |
case n if n <= 0 => IO.fail(None) | |
case n => | |
pull.flatMap(chunk => | |
if (chunk.length > n) UIO.succeed(chunk.take(n)) *> IO.fail(None) | |
else rem.update(_ - chunk.length).as(chunk) | |
) | |
} | |
CStream { | |
for { | |
pull <- self.process | |
state <- Ref.makeManaged(n) | |
finitePull = go(state, pull) | |
} yield finitePull | |
} | |
} | |
def map[B](f: A => B): CStream[R, E, B] = | |
self.copy(process = self.process.map(zioA => zioA.map(chunkA => chunkA.map(f)))) | |
def mapZIO[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): CStream[R1, E1, B] = { | |
def go(pull: ZIO[R, Option[E], Chunk[A]]): ZIO[R1, Option[E1], Chunk[B]] = | |
pull.flatMap(chunkA => chunkA.mapM(f).mapError(Option(_))) | |
CStream { | |
self.process.map(go) | |
} | |
} | |
def forever: CStream[R, E, A] = { | |
final case class ForeverState( | |
currentPull: ZIO[R, Option[E], Chunk[A]], | |
currentFinalizer: ZManaged.Finalizer | |
) | |
def go(ref: Ref[ForeverState]): ZIO[R, Option[E], Chunk[A]] = | |
ref.get | |
.flatMap(state => | |
state.currentPull | |
.catchAllCause(cause => | |
Cause.sequenceCauseOption(cause) match { | |
case Some(value) => | |
IO.halt(cause) | |
case None => | |
ZIO.uninterruptibleMask { restore => | |
state.currentFinalizer(Exit.unit) *> | |
ReleaseMap.make.flatMap { releaseMap => | |
restore(self.process.zio.provideSome[R](r => (r, releaseMap))).flatMap { case (_, newPull) => | |
val finalizer: ZManaged.Finalizer = | |
exit => releaseMap.releaseAll(exit, ExecutionStrategy.Sequential) | |
ref.update(_.copy(currentPull = newPull, currentFinalizer = finalizer)) | |
} | |
} *> go(ref) | |
} | |
} | |
) | |
) | |
CStream { | |
Ref | |
.makeManaged(ForeverState(IO.fail(None), ZManaged.Finalizer.noop)) | |
.map(go) | |
} | |
} | |
def fold[Z](zero: Z)(f: (Z, A) => Z): ZIO[R, E, Z] = | |
process.use { pull => | |
def go(acc: Z, pull: ZIO[R, Option[E], Chunk[A]]): ZIO[R, E, Z] = | |
pull.foldM( | |
failure = { | |
case None => UIO.succeed(acc) | |
case Some(e) => ZIO.fail(e) | |
}, | |
success = { chunk => | |
val nextAcc = chunk.foldLeft(acc)(f) | |
go(nextAcc, pull) | |
} | |
) | |
go(zero, pull) | |
} | |
def runDrain: ZIO[R, E, Unit] = | |
process.use { pull => | |
lazy val loop: ZIO[R, E, Unit] = | |
pull.foldM( | |
failure = { | |
case None => ZIO.unit | |
case Some(e) => ZIO.fail(e) | |
}, | |
success = _ => loop | |
) | |
loop | |
} | |
def runCollect: ZIO[R, E, Chunk[A]] = | |
process.zip(ZManaged.succeed(ChunkBuilder.make[A]())).use { case (pull, builder) => | |
lazy val loop: ZIO[R, E, Chunk[A]] = | |
pull.foldM( | |
failure = { | |
case None => ZIO.succeed(builder.result()) | |
case Some(e) => ZIO.fail(e) | |
}, | |
success = next => ZIO.succeed(builder ++= next) *> loop | |
) | |
loop | |
} | |
def tap[R1 <: R, E1 >: E](f: A => ZIO[R1, E1, Any]): CStream[R1, E1, A] = | |
CStream( | |
self.process.map { pull => | |
pull.tap(chunk => chunk.mapM(f).mapError(Option(_))) | |
} | |
) | |
} | |
object CStream { | |
def fromIterator[A](in: => Iterator[A]): CStream[Any, Throwable, A] = | |
CStream( | |
ZManaged.succeed(in).map { iterator => | |
// describe how to execute one step | |
ZIO.succeed(iterator.hasNext).flatMap { | |
case true => ZIO.succeed(Chunk.single(iterator.next())) | |
case false => ZIO.fail(None) | |
} | |
} | |
) | |
def linesFromFile(path: String): CStream[Any, Throwable, String] = { | |
val readFile = ZManaged.make( | |
acquire = ZIO.effect(new BufferedReader(new FileReader(path))) | |
)(release = reader => ZIO.effectTotal(reader.close())) | |
CStream( | |
readFile.map { reader => | |
ZIO | |
.effect(reader.readLine()) | |
.mapError(Option(_)) | |
.flatMap { s => | |
if (s == null) IO.fail(None) | |
else IO.succeed(Chunk.single(s)) | |
} | |
}.orDie | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment