Skip to content

Instantly share code, notes, and snippets.

@Swoorup
Created June 28, 2022 07:51
Show Gist options
  • Save Swoorup/86685b2e81467ecb0e4fc0409afb30dc to your computer and use it in GitHub Desktop.
Save Swoorup/86685b2e81467ecb0e4fc0409afb30dc to your computer and use it in GitHub Desktop.
Typed Akka like actors using fs2 and cats
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.effect.*
import fs2.Stream
import cats.effect.std.Queue
import scala.concurrent.duration.*
case class FSM[F[_], S, I, O](run: (S, I) => F[(S, O)])
case class Ping[F[_]](replyTo: Deferred[F, String])
def makeActor[S, I, O](initialState: S, fsm: FSM[IO, S, I, O]): ResourceIO[I => IO[O]] =
for
ref <- Ref.of[IO, S](initialState).toResource
queue <- Queue.unbounded[IO, (I, Deferred[IO, O])].toResource
_ <- Stream
.fromQueueUnterminated(queue)
.evalScan(initialState) { case (state, (msg, replyTo)) =>
fsm
.run(state, msg)
.flatMap { case (newState, output) =>
replyTo.complete(output).as(newState)
}
}
.compile
.drain
.background
ask = (input: I) =>
for
deferred <- Deferred[IO, O]
_ <- queue.offer((input, deferred))
output <- deferred.get
yield output
yield ask
object StreamAskReplyDemo extends IOApp.Simple:
val pongBehavior = FSM[IO, Unit, Ping[IO], Unit] {
case (_, Ping(replyTo)) =>
replyTo
.complete("Hello from pong")
.as(() -> ())
}
val run =
makeActor((), pongBehavior)
.use(pongActor =>
(IO.println("Press enter to get response:\n") *> IO.readLine)
.flatMap(_ =>
for
promise <- Deferred[IO, String]
_ <- pongActor(Ping(promise))
response <- promise.get
_ <- IO.println(s"Got response: $response")
yield ()
)
.foreverM
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment