Skip to content

Instantly share code, notes, and snippets.

@fancellu
Created July 14, 2022 11:27
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fancellu/a2a9dd4ac5c1a09ee471d7c3cc0770d9 to your computer and use it in GitHub Desktop.
Save fancellu/a2a9dd4ac5c1a09ee471d7c3cc0770d9 to your computer and use it in GitHub Desktop.
FS2 cats.effect.Queue example using flatMap or for comprehension
import cats.effect._
import fs2._
import scala.concurrent.duration._
import cats.effect.std._
// FS2 cats.effect.Queue example using flatMap or for comprehension
// Both streams emit nothing, but are effectful, communicating via the queue and updating a sum via the ref
object Fs2Queues extends IOApp.Simple {
val queueS: Stream[IO, Queue[IO, Int]] = Stream.eval(Queue.unbounded[IO, Int])
val refS: Stream[IO, Ref[IO, Int]] = Stream.eval(Ref.of[IO, Int](0))
// If you want to use flatMap
// override def run: IO[Unit] = {
// val out: Stream[IO, Unit] = queueS.flatMap { queue =>
// refS.flatMap { ref =>
// val producer: Stream[IO, Nothing] = Stream.iterate(0)(_ + 1).covary[IO].metered(100.milli).evalMap(i => IO.println(s"Producing $i") *> queue.offer(i)).drain
// val consumer: Stream[IO, Nothing] = Stream.fromQueueUnterminated(queue).evalMap(i => IO.println(s"Consuming $i") *> ref.update(_ + i)).drain
// val merged: Stream[IO, Unit] = producer.merge(consumer).interruptAfter(1.second) ++ Stream.eval(ref.get).flatMap(sum => Stream.eval(IO.println(s"Sum: $sum")))
// merged
// }
// }
// out.compile.drain
// }
// Using for comprehension
override def run: IO[Unit] = {
val out: Stream[IO, Unit] = for {
queue: Queue[IO, Int] <- queueS
ref: Ref[IO, Int] <- refS
producer: Stream[IO, Nothing] = Stream.iterate(0)(_ + 1).covary[IO].metered(100.milli).evalMap(i => IO.println(s"Producing $i") *> queue.offer(i)).drain
consumer: Stream[IO, Nothing] = Stream.fromQueueUnterminated(queue).evalMap(i => IO.println(s"Consuming $i") *> ref.update(_ + i)).drain
merged: Unit <- producer.merge(consumer).interruptAfter(1.second) ++ Stream.eval(ref.get).flatMap(sum => Stream.eval(IO.println(s"Sum: $sum")))
} yield merged
out.compile.drain
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment