Skip to content

Instantly share code, notes, and snippets.

@Swoorup
Last active July 17, 2023 19:54
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Swoorup/1ac9b69e0c0f1c0925d1397a94b0a762 to your computer and use it in GitHub Desktop.
Save Swoorup/1ac9b69e0c0f1c0925d1397a94b0a762 to your computer and use it in GitHub Desktop.
Typed Actors using Cats Effect, FS2 and Deferred Magic
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.effect.*
import fs2.Stream
import cats.effect.std.Queue
import scala.concurrent.duration.*
import lib.FSM
import lib.actor.{Actor, AskMsg}
type ActorBehavior[F[_], S, -I[_], O] = ((S, I[O]) => F[(S, O | Unit)])
type ActorBehaviorK[F[_], S, -I[_]] = [O] => ((S, I[O]) => F[(S, O | Unit)])
/** Api for higher-kinded actors
*
* @note
* Currently this isn't very pleasant to use.
* ETA expansion and Type inference for polymorphic function type needs to be improved
* - https://github.com/lampepfl/dotty/issues/15555
* - https://github.com/lampepfl/dotty/issues/15554
*/
trait ActorK[F[_], -I[_]]:
/** Send one-way message without waiting for acknowledgement
* @param msg the message send
*/
def send[O](msg: I[O]): F[Unit]
/** Send the message and wait for acknowledgement with output
* @param msg the message to send
*/
def ask[O](msg: I[O]): F[O]
def closed: F[Boolean]
infix def ![O](msg: I[O]): F[Unit] = send(msg)
infix def ?[O](msg: I[O]): F[O] = ask(msg)
object ActorK:
/** Create a new actor that can self-reference
* @param initialState the initial state
* @param createFSM fsm constructor with self actor ref
* @param finalize the finalizer effect with the last state
* @return Actor object
*/
def makeFullK[F[_]: Temporal, S, I[_]](
initialState: S,
createFSM: [O] => (self: ActorK[F, I]) => ActorBehavior[F, S, I, O],
finalize: (state: S) => F[Unit]
): Resource[F, ActorK[F, I]] =
def toActorK(actor: Actor[F, I[Any], Any]): ActorK[F, I] =
new ActorK[F, I]:
def send[O](msg: I[O]): F[Unit] =
actor.send(msg.asInstanceOf[I[Any]])
def ask[O](msg: I[O]): F[O] =
actor.ask(msg.asInstanceOf[I[Any]]).asInstanceOf[F[O]]
def closed: F[Boolean] =
actor.closed
Actor
.makeFull[F, S, I[Any], Any](
initialState,
self =>
val k = toActorK(self)
FSM[F, S, I[Any], Any](createFSM(k))
,
finalize
)
.map(toActorK)
/** Create a new actor that can self-reference
* @param initialState the initial state
* @param fsm fsm constructor with self actor ref
* @param finalize the finalizer effect with the last state
* @return Actor object
*/
def makeK[F[_]: Temporal, S, I[_]](
initialState: S,
fsm: ActorBehaviorK[F, S, I],
finalize: (state: S) => F[Unit]
): Resource[F, ActorK[F, I]] =
makeFullK(initialState, [O] => (_: ActorK[F, I]) => fsm[O], finalize)
package lib
import cats.syntax.all.*
import cats.effect.std.Queue
import cats.effect.{Concurrent, Deferred, Ref, Resource}
import cats.effect.syntax.all.*
import fs2.Stream
import FSM
import scala.annotation.targetName
trait AskMsg[F[_], R]:
def replyTo: Deferred[F, R]
trait Actor[F[_], I, O]:
/** Send the message without waiting for acknowledgement
* @param msg the message send
*/
def sendNoWait(msg: I): F[Unit]
/** Send the message and wait for acknowledgement with output
* @param msg the message to send
*/
def send(msg: I): F[O]
/** Send the message and wait for typed response
* @param msg the request message
*/
def ask[Response, M <: I & AskMsg[F, Response]](msg: Deferred[F, Response] => M): F[Response]
@targetName("sendNoWait_infix")
infix def !(input: I): F[Unit] = sendNoWait(input)
case class ActorDeadException(msg: String) extends Exception(msg)
object Actor:
/** Create a new actor that can self-reference
* @param initialState the initial state
* @param createFsm fsm constructor with self actor ref
* @param finalize the finalizer effect with the last state
* @return Actor object
*/
def makeFull[F[_]: Concurrent, S, I, O](
initialState: S,
createFsm: (self: Actor[F, I, O]) => FSM[F, S, I, O],
finalize: (state: S) => F[Unit]
): Resource[F, Actor[F, I, O]] =
for
actorRef <- Deferred[F, Actor[F, I, O]].toResource
mailbox <- Queue.unbounded[F, (I, Deferred[F, O])].toResource
isDeadRef <- Ref.of[F, Boolean](false).toResource
_ <- Stream
.eval((Ref.of[F, S](initialState), actorRef.get).tupled)
.flatMap((ref, actorRef) =>
val fsm = createFsm(actorRef)
Stream
.fromQueueUnterminated(mailbox)
.evalScan(initialState) { case (state, (input, replyTo)) =>
fsm
.run(state, input)
.flatMap((newState, output) => ref.set(newState) *> replyTo.complete(output).as(newState))
}
.onFinalize((isDeadRef.set(true) *> ref.get.flatMap(finalize)).uncancelable)
)
.compile
.drain
.background
throwIfDead = isDeadRef.get.flatMap(Concurrent[F].raiseWhen(_)(ActorDeadException("Actor is dead")))
actor = new Actor[F, I, O]:
def sendNoWait(input: I): F[Unit] =
throwIfDead *> Deferred[F, O].flatMap(mailbox.offer(input, _)).void
def send(msg: I): F[O] =
throwIfDead *> Deferred[F, O].flatMap(promise => mailbox.offer(msg, promise) *> promise.get)
def ask[Response, M <: I & AskMsg[F, Response]](msg: Deferred[F, Response] => M): F[Response] =
throwIfDead *> Deferred[F, Response].flatMap(promise => send(msg(promise)) *> promise.get)
_ <- actorRef.complete(actor).toResource
yield actor
/** Create a new actor with finalizer
* @param initialState Initial state of the actor
* @param fsm the finite state machine
* @param finalize the cleanup effect with the last known state
* @return Actor object
*/
def makeWithFinalize[F[_]: Concurrent, S, I, O](
initialState: S,
fsm: FSM[F, S, I, O],
finalize: S => F[Unit]
): Resource[F, Actor[F, I, O]] =
makeFull(initialState, (_: Actor[F, I, O]) => fsm, finalize)
def make[F[_]: Concurrent, S, I, O](
initialState: S,
fsm: FSM[F, S, I, O]
): Resource[F, Actor[F, I, O]] =
makeWithFinalize(initialState, fsm, _ => Concurrent[F].unit)
def makeSimple[F[_]: Concurrent, S, I, O](
initialState: S,
fsm: FSM[F, S, I, O]
): Resource[F, I => F[O]] =
make(initialState, fsm).map(_.send)
package lib
import scala.util.chaining.*
import cats.syntax.all.*
import cats.{Applicative, Functor, Id}
/** F[_] - Effect S - State I - Input O - Output
*/
case class FSM[F[_], S, I, O](run: (S, I) => F[(S, O)]):
def runS(using F: Functor[F]): (S, I) => F[S] =
(s, i) => run(s, i).map(_._1)
object FSM:
def id[S, I, O](run: (S, I) => Id[(S, O)]): FSM[Id, S, I, O] = FSM(run)
def pure[F[_]: Applicative, S, I, O](run: (S, I) => (S, O)): FSM[F, S, I, O] =
FSM { case (s, in) => Applicative[F].pure(run(s, in)) }
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.effect.*
import fs2.Stream
import cats.effect.std.Queue
import scala.concurrent.duration.*
import lib.FSM
import lib.actor.{Actor, AskMsg}
case class Ping[F[_]](from: String, replyTo: Deferred[F, String]) extends AskMsg[F, String]
val pongBehavior = FSM[IO, Unit, Ping[IO], Unit] { case (_, Ping(who, replyTo)) =>
// IO.sleep(2.seconds) *>
replyTo
.complete(s"Hello ${who} from pong")
.void
.tupleLeft(())
}
type CounterMessages[F[_]] = Int | CounterInput[F]
enum CounterInput[F[_]]:
case Inc()
case Dec()
case Get(replyTo: Deferred[F, Int]) extends CounterInput[F] with AskMsg[F, Int]
val counterBehavior = FSM[IO, Int, CounterMessages[IO], Unit] {
case (s, CounterInput.Inc()) =>
IO(s + 1, ())
case (s, CounterInput.Dec()) =>
IO(s + 1, ())
case (s, CounterInput.Get(replyTo)) =>
replyTo.complete(s) *> IO(s, ())
case (_, value: Int) =>
IO(value, ())
}
object StreamAskReplyDemo extends IOApp.Simple:
val SetPattern = "(set) ([0-9]*)".r
val run =
(
Actor.make((), pongBehavior),
Actor.make(0, counterBehavior)
).tupled
.use((pongActor, counterActor) =>
IO.println("Valid commands are [inc, dec, get, pong, set, quit]: ") *>
(for
input <- IO.readLine
_ <- input match
case "inc" => counterActor ! CounterInput.Inc()
case "dec" => counterActor ! CounterInput.Dec()
case SetPattern(_, param) => counterActor ! param.toInt
case "get" =>
counterActor
.ask(CounterInput.Get(_))
.flatMap(response => IO.println(s"Counter: ${response}"))
case "pong" =>
pongActor
.ask(Ping("Sytherax", _))
.timeout(1.second)
.flatMap(c => IO.println(s"Ping Response: ${c}"))
case "quit" => IO.unit
case _ => IO.println("Invalid input")
yield input).iterateUntil(_ == "quit").void
)
import lib.actor.ActorK
import lib.actor.*
object TypedActorsKDemo extends IOApp.Simple:
case object Ping
type M[A] = Ping.type
def pongBehaviorK[O]: ActorBehavior[IO, Unit, M, O] =
case (_, Ping) =>
IO.sleep(10.seconds) *> IO.println("Pong") *> IO((), ())
enum CounterInput[+Response]:
case Inc
case Dec
case Get extends CounterInput[Int]
type Message[T] = Int | CounterInput[T]
def counterBehaviorK[O]: ActorBehavior[IO, Int, Message, O] =
case (state, CounterInput.Inc) =>
IO(state + 1, ())
case (state, CounterInput.Dec) =>
IO(state - 1, ())
case (state, CounterInput.Get) =>
IO(state, state)
case (state, value: Int) =>
IO(value, ())
val SetPattern = "(set) ([0-9]*)".r
val run =
(
ActorK.makeK[IO, Int, Message](0, [O] => counterBehaviorK(_: Int, _: Message[O]), _ => IO.unit),
ActorK.makeK[IO, Unit, M]((), [O] => pongBehaviorK(_: Unit, _: M[O]), _ => IO.unit)
).tupled
.use((counterActor, pongActor) =>
IO.println("Valid commands are [inc, dec, get, set, ping, quit]: ") *>
(for
input <- IO.readLine
_ <- input match
case "inc" => counterActor ! CounterInput.Inc
case "dec" => counterActor ? CounterInput.Dec
case SetPattern(_, param) => counterActor ! param.toInt
case "get" =>
counterActor
.?(CounterInput.Get)
.flatMap(response => IO.println(s"Get Response: ${response}"))
case "ping" => pongActor ? Ping
case "quit" => IO.unit
case _ => IO.println("Invalid input")
yield input).iterateUntil(_ == "quit").void
)
@Swoorup
Copy link
Author

Swoorup commented Jun 28, 2022

From Daniel

Couple thoughts…

Are you trying to use types to enforce that all ask messages receive a response? If you don't do this, then I think you can simplify things out quite a bit. In particular, the choice of ask vs send becomes a call-site thing that the receiver is oblivious to, and the receiver can send responses regardless. Typing the response of an ask message does make sense to me.

I think ask can probably have a simpler signature. In particular, rather than making the indirection of the Deferred part of the message (something which every message would then have to encode), you could do it behind the scenes for the user and just allow ask and send to be almost exactly identical (except that send always produces F[Unit] and always returns immediately).

As for the handler, I would be thinking in terms of GADTs, particularly if we're typing ask messages differently. I would use phantom types to encode the expected response type of an ask message, and the handler case for that message within the actor would need to produce an F[_] containing that type. send messages would of course produce F[Unit].

For the record, I still think actors are a bad abstraction. 🙂 But they're a fun space to play in, if nothing else.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment