Skip to content

Instantly share code, notes, and snippets.

@lrodero
Created July 19, 2023 22:46
Show Gist options
  • Save lrodero/636d82b18015d98328ac80c5f5645c13 to your computer and use it in GitHub Desktop.
Save lrodero/636d82b18015d98328ac80c5f5645c13 to your computer and use it in GitHub Desktop.
//> using scala "2.13.11"
//> using lib "org.typelevel::cats-effect::3.5.1"
import cats.effect._
import cats.effect.std.{Console, Queue}
import cats.instances.list._
import cats.syntax.all._
object ProducerConsumerWithStdQueue extends IOApp {
def producer(id: Int, q: Queue[IO, Int], counterR: Ref[IO, Int]): IO[Unit] =
for {
i <- counterR.getAndUpdate(_ + 1)
_ <- q.offer(i)
_ <- if (i % 10000 == 0) IO.println(s"Producer $id has reached $i items") else IO.unit
_ <- producer(id, q, counterR)
} yield ()
def consumer(id: Int, q: Queue[IO, Int]): IO[Unit] =
for {
i <- q.take
_ <- if (i % 10000 == 0) IO.println(s"Consumer $id has reached $i items") else IO.unit
_ <- consumer(id, q)
} yield ()
override def run(args: List[String]): IO[ExitCode] =
for {
counterR <- Ref.of[IO, Int](0)
q <- Queue.unbounded[IO, Int]
producers = List.range(1, 11).map(producer(_, q, counterR)) // 10 producers
consumers = List.range(1, 11).map(consumer(_, q)) // 10 consumers
res <- (producers ++ consumers)
.parSequence.as(ExitCode.Success)
.handleErrorWith { t =>
Console[IO].errorln(s"Error caught: ${t.getMessage}").as(ExitCode.Error)
}
} yield res
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment