Skip to content

Instantly share code, notes, and snippets.

@Baccata
Created January 26, 2023 11:07
Show Gist options
  • Save Baccata/a19c7ec68882f989cb33f281908dcf88 to your computer and use it in GitHub Desktop.
Save Baccata/a19c7ec68882f989cb33f281908dcf88 to your computer and use it in GitHub Desktop.
Wait for queue drain example
//> using lib "co.fs2::fs2-core:3.5.0"
import cats.effect.IOApp
import cats.effect._
import cats.effect.std.Queue
import scala.concurrent.duration._
import cats.effect.syntax.all._
import cats.syntax.all._
import cats.instances.queue
object Main extends IOApp.Simple {
def run: IO[Unit] = prog.use_.timeout(5.seconds)
val makeQueue =
Resource.eval(Queue.bounded[IO, Option[String]](10))
def consume(queue: Queue[IO, Option[String]]): Resource[IO, Unit] = {
fs2.Stream
.fromQueueNoneTerminated(queue)
.evalTap(IO.println)
.compile
.drain
.background
.flatMap { waitForOutcome =>
// On finalization, wait for the consumption of everything that'll
// have been queued by offering a None and waiting to receive
// it on the other end
//
// Remember that finalizers are executed in reverse.
// Think of the movie Tenet.
Resource.onFinalize {
queue.offer(None) >>
waitForOutcome.void
}
}
}
val prog = for {
_ <- Resource.eval(IO.println("start"))
q <- makeQueue
_ <- Resource.eval(q.offer(Some("first")))
_ <- consume(q)
_ <- Resource.eval(IO.println("finish"))
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment