Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Created May 3, 2022 14:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calvinlfer/89f1191b09b0fb26d3b6c3964df5b0bb to your computer and use it in GitHub Desktop.
Save calvinlfer/89f1191b09b0fb26d3b6c3964df5b0bb to your computer and use it in GitHub Desktop.
Understanding ZChannels (zio-streams 2.x)
object Experiment extends ZIOAppDefault {
/**
Collect elements and emit them in single chunks of 100
*/
val aggregate: ZPipeline[Any, Nothing, Int, Long] = {
def go[Err](acc: Long): ZChannel[Any, Err, Chunk[Int], Any, Err, Chunk[Long], Any] =
ZChannel.readWith[Any, Err, Chunk[Int], Any, Err, Chunk[Long], Any](
in = { inChunk =>
val next = acc + inChunk.sum
if (next > 100L) {
val rem = next - 100L
ZChannel.write(Chunk.single(100L)) *> go(rem)
} else go(next)
},
error = err => ZChannel.fail(err),
done = done => ZChannel.write(Chunk.single(acc)) *> ZChannel.succeed(done)
)
go(0L).toPipeline[Int, Long]
}
override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
ZStream
.iterate(0)(_ + 1)
.take(50)
.via(aggregate)
.debug("aggregator>")
.runCollect
.debug("collect>")
}
object LearningChannel extends ZIOAppDefault {
// Producer of elements
val countUp: UStream[Int] = {
def go(current: Int): ZChannel[Any, Any, Any, Any, Nothing, Chunk[Int], Unit] =
ZChannel.write(Chunk.single(current)) *> go(current + 1)
go(0).toStream
}
override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
countUp
.throttleShape(1, 100.millis)(_.size.toLong)
.debug("writer>")
.runDrain
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment