Skip to content

Instantly share code, notes, and snippets.

@ayeo
Created March 10, 2022 15:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ayeo/ac41a752743e0dfaf646cd2482ed5796 to your computer and use it in GitHub Desktop.
Save ayeo/ac41a752743e0dfaf646cd2482ed5796 to your computer and use it in GitHub Desktop.
package pl.ayeo
import cats.effect.IO
import cats.effect.Ref
import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
import cats.syntax.all.*
import fs2.{Pipe, Stream}
import scala.concurrent.duration.*
import scala.util.Random
object ActorApp extends App {
def input: Stream[IO, String] = Stream.repeatEval(IO.sleep(500.millis) >> IO("tic"))
case class Actor(r: Ref[IO, Int]) {
def handle(input: String): IO[Option[String]] = r.getAndUpdate(_ + 1).map { c =>
if (input == "Hello") Some("Hello to myself")
else if (c % 5 == 0) Some("Hello")
else None
}
}
def actorPipe(q: Queue[IO, String], actor: Actor): Pipe[IO, String, String] = _.evalMap { input =>
for {
oi <- actor.handle(input)
_ <- oi.map(q.offer).sequence
} yield input
}
val actorStream: Stream[IO, Actor] = Stream.eval(Ref[IO].of(0).map(Actor(_)))
val queueStream: Stream[IO, Queue[IO, String]] = Stream.eval(Queue.unbounded[IO, String])
val program: Stream[IO, String] = actorStream.zip(queueStream).flatMap { (actor, queue) =>
Stream.fromQueueUnterminated(queue).merge(input).through(actorPipe(queue, actor))
}
program.through(_.evalMap(t => IO.println(s"${Thread.currentThread().getName()}:\t $t"))).compile.drain.unsafeRunSync()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment