Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created December 17, 2021 20:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calvinlfer/a5331d5f659cea61ff612b1b74449fe0 to your computer and use it in GitHub Desktop.
Save calvinlfer/a5331d5f659cea61ff612b1b74449fe0 to your computer and use it in GitHub Desktop.
Zymposium part 1
import java.io.{BufferedReader, FileReader}
final case class CStream[-R, +E, +A](
private val process: ZManaged[R, Nothing, ZIO[R, Option[E], Chunk[A]]]
) { self =>
def take(n: Int): CStream[R, E, A] = {
def go(rem: Ref[Int], pull: ZIO[R, Option[E], Chunk[A]]): ZIO[R, Option[E], Chunk[A]] =
rem.get.flatMap {
case n if n <= 0 => IO.fail(None)
case n =>
pull.flatMap(chunk =>
if (chunk.length > n) UIO.succeed(chunk.take(n)) *> IO.fail(None)
else rem.update(_ - chunk.length).as(chunk)
)
}
CStream {
for {
pull <- self.process
state <- Ref.makeManaged(n)
finitePull = go(state, pull)
} yield finitePull
}
}
def map[B](f: A => B): CStream[R, E, B] =
self.copy(process = self.process.map(zioA => zioA.map(chunkA => chunkA.map(f))))
def mapZIO[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): CStream[R1, E1, B] = {
def go(pull: ZIO[R, Option[E], Chunk[A]]): ZIO[R1, Option[E1], Chunk[B]] =
pull.flatMap(chunkA => chunkA.mapM(f).mapError(Option(_)))
CStream {
self.process.map(go)
}
}
def forever: CStream[R, E, A] = {
final case class ForeverState(
currentPull: ZIO[R, Option[E], Chunk[A]],
currentFinalizer: ZManaged.Finalizer
)
def go(ref: Ref[ForeverState]): ZIO[R, Option[E], Chunk[A]] =
ref.get
.flatMap(state =>
state.currentPull
.catchAllCause(cause =>
Cause.sequenceCauseOption(cause) match {
case Some(value) =>
IO.halt(cause)
case None =>
ZIO.uninterruptibleMask { restore =>
state.currentFinalizer(Exit.unit) *>
ReleaseMap.make.flatMap { releaseMap =>
restore(self.process.zio.provideSome[R](r => (r, releaseMap))).flatMap { case (_, newPull) =>
val finalizer: ZManaged.Finalizer =
exit => releaseMap.releaseAll(exit, ExecutionStrategy.Sequential)
ref.update(_.copy(currentPull = newPull, currentFinalizer = finalizer))
}
} *> go(ref)
}
}
)
)
CStream {
Ref
.makeManaged(ForeverState(IO.fail(None), ZManaged.Finalizer.noop))
.map(go)
}
}
def fold[Z](zero: Z)(f: (Z, A) => Z): ZIO[R, E, Z] =
process.use { pull =>
def go(acc: Z, pull: ZIO[R, Option[E], Chunk[A]]): ZIO[R, E, Z] =
pull.foldM(
failure = {
case None => UIO.succeed(acc)
case Some(e) => ZIO.fail(e)
},
success = { chunk =>
val nextAcc = chunk.foldLeft(acc)(f)
go(nextAcc, pull)
}
)
go(zero, pull)
}
def runDrain: ZIO[R, E, Unit] =
process.use { pull =>
lazy val loop: ZIO[R, E, Unit] =
pull.foldM(
failure = {
case None => ZIO.unit
case Some(e) => ZIO.fail(e)
},
success = _ => loop
)
loop
}
def runCollect: ZIO[R, E, Chunk[A]] =
process.zip(ZManaged.succeed(ChunkBuilder.make[A]())).use { case (pull, builder) =>
lazy val loop: ZIO[R, E, Chunk[A]] =
pull.foldM(
failure = {
case None => ZIO.succeed(builder.result())
case Some(e) => ZIO.fail(e)
},
success = next => ZIO.succeed(builder ++= next) *> loop
)
loop
}
def tap[R1 <: R, E1 >: E](f: A => ZIO[R1, E1, Any]): CStream[R1, E1, A] =
CStream(
self.process.map { pull =>
pull.tap(chunk => chunk.mapM(f).mapError(Option(_)))
}
)
}
object CStream {
def fromIterator[A](in: => Iterator[A]): CStream[Any, Throwable, A] =
CStream(
ZManaged.succeed(in).map { iterator =>
// describe how to execute one step
ZIO.succeed(iterator.hasNext).flatMap {
case true => ZIO.succeed(Chunk.single(iterator.next()))
case false => ZIO.fail(None)
}
}
)
def linesFromFile(path: String): CStream[Any, Throwable, String] = {
val readFile = ZManaged.make(
acquire = ZIO.effect(new BufferedReader(new FileReader(path)))
)(release = reader => ZIO.effectTotal(reader.close()))
CStream(
readFile.map { reader =>
ZIO
.effect(reader.readLine())
.mapError(Option(_))
.flatMap { s =>
if (s == null) IO.fail(None)
else IO.succeed(Chunk.single(s))
}
}.orDie
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment