Skip to content

Instantly share code, notes, and snippets.

@ayeo
Created March 9, 2022 09:00
Show Gist options
  • Save ayeo/17513d3edd29b4dc439f713e1eed8847 to your computer and use it in GitHub Desktop.
Save ayeo/17513d3edd29b4dc439f713e1eed8847 to your computer and use it in GitHub Desktop.
package pl.ayeo
import cats.effect.IO
import cats.effect.Ref
import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
import cats.syntax.all.*
import fs2.{Pipe, Stream}
import scala.concurrent.duration.*
object QueueApp extends App {
def i1 = Stream.repeatEval(IO.readLine.map(_.toIntOption))
def i2 = Stream.repeatEval(IO.sleep(1.second) >> IO(Some(1)))
def actor2(adder: IO[Adder]): Pipe[IO, Option[Int], IO[Option[Int]]] =
stream => stream.map(input =>
adder.map { a =>
input.map { integer =>
a.run(integer)
}.sequence
}.flatten
)
def logger: Pipe[IO, IO[Option[Int]], Unit] = _.map(println)
def a2(i: Option[Int]): Option[Int] = i.map(_ + 2)
case class Adder(c: Ref[IO, Int]) {
def run(i: Int): IO[Int] = c.modify(d => (i + d, i))
}
val ioAdder: IO[Adder] = Ref[IO].of(0).map(Adder(_))
i1.merge(i2).through(actor2(ioAdder)).through(logger).compile.drain.unsafeRunSync()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment