Skip to content

Instantly share code, notes, and snippets.

@arturaz
Last active November 8, 2023 09:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arturaz/2381b85c40674f4cd15a2992e97a43fa to your computer and use it in GitHub Desktop.
Save arturaz/2381b85c40674f4cd15a2992e97a43fa to your computer and use it in GitHub Desktop.
Simple Actor implementation using cats effect and FS2
package app.utils
import cats.effect.Concurrent
import cats.effect.kernel.{DeferredSink, DeferredSource}
import fs2.concurrent.Channel
/**
* An actor that processes messages one at a time.
*/
trait Actor[F[_], Message] {
/**
* Sends the message to the actor. The [[F]] semantically blocks until the message is put into the actors mailbox.
*
* If the actor is stopped, the message is not processed.
* */
def send(message: Message): F[Either[Channel.Closed, Actor.MessageEnqueued[F]]]
/** Returns true if the actor is stopped. */
def stopped: F[Boolean]
/** Stops the actor after all messages are consumed. See [[Channel.close]]. */
def stop: F[Either[Channel.Closed, Unit]]
}
trait InspectableActor[F[_], Message, State] extends Actor[F, Message] {
/** Returns the current state of the actor. */
def state: F[State]
}
object Actor {
/**
* Indicates that the message has been enqueued in the actors mailbox.
*
* @param messageProcessed Gets completed once the message has been processed.
* */
case class MessageEnqueued[F[_]](messageProcessed: DeferredSource[F, Unit]) extends AnyVal
private case class ChannelMessage[F[_], Message](message: Message, processingCompleted: DeferredSink[F, Unit])
/**
* Creates and starts an [[Actor]].
*
* @param onMessage Transitions to the next state based on the message and the current state. The [[F]] is awaited for
* before the next message is processed.
* @param onException Invoked when [[onMessage]] fails with an exception. The [[F]] is awaited for before the next
* message is processed.
*/
def create[F[_] : Concurrent, Message, State](
initialState: State
)(
onMessage: (Message, State) => F[State],
onException: (Message, State, Throwable) => F[Unit]
): F[InspectableActor[F, Message, State]] = {
/** The actor loop. */
def consumeStream(ref: Ref[F, State], stream: fs2.Stream[F, ChannelMessage[F, Message]]): F[Unit] = {
stream
.evalTap { case ChannelMessage(message, processingCompleted) =>
for {
state <- ref.get
_ <- onMessage(message, state).attempt.flatMap {
case Left(e) => onException(message, state, e)
case Right(newState) => ref.set(newState) *> processingCompleted.complete(()).void
}
} yield ()
}
.compile
.drain
}
for {
inbox <- Channel.unbounded[F, ChannelMessage[F, Message]]
ref <- Ref[F].of(initialState)
_ <- consumeStream(ref, inbox.stream).start
} yield new InspectableActor[F, Message, State] {
override def send(message: Message): F[Either[Channel.Closed, MessageEnqueued[F]]] = {
for {
deferred <- Deferred[F, Unit]
either <- inbox.send(ChannelMessage(message, deferred))
} yield either.as(MessageEnqueued(deferred))
}
override def state: F[State] =
ref.get
override def stopped: F[Boolean] =
inbox.isClosed
override def stop: F[Either[Channel.Closed, Unit]] =
inbox.close
}
}
/**
* As [[create]] but the actor does not have a state.
*/
def createStateless[F[_] : Concurrent, Message](
onMessage: Message => F[Unit],
onException: (Message, Throwable) => F[Unit]
): F[Actor[F, Message]] = create[F, Message, Unit](())(
onMessage = (msg, _) => onMessage(msg),
onException = (msg, _, e) => onException(msg, e)
).map(actor => actor: Actor[F, Message])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment