Skip to content

Instantly share code, notes, and snippets.

@zainab-ali
Created March 10, 2024 00:16
Show Gist options
  • Save zainab-ali/225b4053bd91876bb347f20f3f48bf66 to your computer and use it in GitHub Desktop.
Save zainab-ali/225b4053bd91876bb347f20f3f48bf66 to your computer and use it in GitHub Desktop.
//> using scala "3.4.0"
//> using lib "co.fs2::fs2-core:3.9.4"
import fs2.*
import cats.syntax.all.*
import cats.effect.*
object Demo extends IOApp.Simple {
def combineResources[O, O2](f: Chunk[O] => Resource[IO, O2])(
in: Stream[IO, Chunk[O]]
): Stream[IO, Chunk[O2]] = {
def go(
in: Stream[IO, Chunk[O]],
acc: Chunk[O2]
): Pull[IO, Chunk[O2], Unit] =
in.pull.uncons1.flatMap {
case Some((h, t)) =>
Stream
.resource(f(h))
.pull
.headOrError
.flatMap(o2 => go(t, Chunk.singleton(o2) ++ acc))
case None => Pull.output1(acc)
}
go(in, Chunk.empty).stream
}
def consumeSubstream[O](chunk: Chunk[O]): Resource[IO, Chunk[O]] = {
Resource
.make(IO.println(s"Allocate $chunk"))(_ => IO.println(s"Release $chunk"))
.evalMap { _ =>
IO.println(s"Use $chunk")
}
.as(chunk)
}
def run = {
Stream
.range(0, 20)
.covary[IO]
.evalTap(IO.println(_))
.chunkN(4, allowFewer = false)
.through(
combineResources(consumeSubstream)
)
.compile
.drain
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment