Skip to content

Instantly share code, notes, and snippets.

@hanishi
Last active November 6, 2022 13:22
Show Gist options
  • Save hanishi/c00e5c16c5375c0285cc08e1ac0efc57 to your computer and use it in GitHub Desktop.
Save hanishi/c00e5c16c5375c0285cc08e1ac0efc57 to your computer and use it in GitHub Desktop.
Sample parent-child actors with aggregator for collecting the status of child actors.
package actors
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag
//see https://github.com/akka/akka/blob/v2.7.0/akka-actor-typed-tests/src/test/scala/docs/akka/typed/Aggregator.scala
object Aggregator {
type Reply = Any
def apply[Reply: ClassTag, Aggregate](
sendRequests: => ActorRef[Reply] => Unit,
replyTo: ActorRef[Aggregate],
aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate,
expectedReplies: Int,
timeout: FiniteDuration
): Behavior[Command] = {
Behaviors.setup { context =>
context.setReceiveTimeout(timeout, ReceiveTimeout)
val replyAdapter = context.messageAdapter[Reply](WrappedReply(_))
sendRequests(replyAdapter)
def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] =
Behaviors.receiveMessagePartial {
case WrappedReply(reply: Reply) =>
context.log.info("reply received")
val newReplies = replies :+ reply
if (newReplies.size == expectedReplies) {
val result = aggregateReplies(newReplies)
replyTo ! result
Behaviors.stopped
} else
collecting(newReplies)
case ReceiveTimeout =>
context.log.info("Aggregator timed out!")
val aggregate = aggregateReplies(replies)
replyTo ! aggregate
Behaviors.stopped
}
collecting(Vector.empty)
}
}
sealed trait Command
private case class WrappedReply[R](reply: R) extends Command
private case object ReceiveTimeout extends Command
}
app.actors.stash-buffer-size=100
app.actors.ask-timeout=30
app.actors.termination-timeout=1
app.actors.retry-exponent-base=2
# app.actors.retry-count=
package actors
import actors.Child.{INITIALIZING, State}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer, TimerScheduler}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.pattern.StatusReply
import play.api.Configuration
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import scala.concurrent.duration.{DurationDouble, DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Random, Success}
case class Child(id: Int,
state: State = INITIALIZING,
created: LocalDateTime = LocalDateTime.now(),
duration: Long = 0L) {
def withDuration = copy(duration = ChronoUnit.SECONDS.between(created, LocalDateTime.now()))
def transition(state: State) = copy(state = state)
}
object Child {
def load(id: Int): Future[Child] = Future.successful(Child(id))
sealed trait State
sealed trait Command
case class Status(replyTo: ActorRef[StatusReply[Child]]) extends Command
case class Bye(replyTo: ActorRef[StatusReply[Message]]) extends Command
case class Hello(replyTo: ActorRef[StatusReply[Message]]) extends Command
case class StartTask(replyTo: ActorRef[StatusReply[Message]]) extends Command
private case class CommandSucceeded(hi: Message) extends Command
private case class Retry(command: Command, count: Int) extends Command
private case class TaskStarted(id: Int) extends Command
private case class ExecuteTask(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Command
private case class TaskExecuted(result: Message) extends Command
private case class InitializationSucceeded(child: Child) extends Command
private case class CommandFailed(throwable: Throwable) extends Command
private case class InitializationFailed(id: Int, throwable: Throwable) extends Command
private class Actor(
context: ActorContext[Command],
buffer: StashBuffer[Command],
timers: TimerScheduler[Command],
config: Configuration
)(implicit ec: ExecutionContext) {
val EXPONENT_BASE_VALUE: Int = config.getOptional[Int](EXPONENT_BASE).getOrElse(DEFAULT_EXPONENT_BASE)
val TIMEOUT_DURATION: FiniteDuration = config.getOptional[Int](TERMINATION_TIME_OUT).getOrElse(DEFAULT_TERMINATION_TIME_OUT).minutes
val INITIAL_RETRY_COUNT: Int = config.getOptional[Int](RETRY_COUNT).getOrElse(DEFAULT_RETRY_COUNT)
def initializing(): Behavior[Command] = Behaviors.receiveMessage {
case InitializationSucceeded(child) =>
idle(child)
case InitializationFailed(id, throwable) =>
context.log.error(s"Initialization failed for $id", throwable)
Behaviors.stopped
case other =>
buffer.stash(other)
Behaviors.same
}
private def responding(
child: Child,
replyTo: ActorRef[StatusReply[Message]]
): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Status(replyTo) =>
replyTo ! StatusReply.Success(child.withDuration)
Behaviors.same
case CommandSucceeded(message) =>
replyTo ! StatusReply.Success(message)
message match {
case GoodBye(_) =>
Behaviors.stopped
case _ => idle(child)
}
case CommandFailed(throwable) =>
replyTo ! StatusReply.Error(throwable)
idle(child)
case other =>
buffer.stash(other)
Behaviors.same
}
private def runningTask(child: Child, replyTo: ActorRef[StatusReply[Message]]): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Status(replyTo) =>
replyTo ! StatusReply.Success(child.withDuration)
Behaviors.same
case TaskStarted(id) =>
idle(child, ExecuteTask(id, replyTo))
case TaskExecuted(result) =>
replyTo ! StatusReply.Success(result)
idle(child)
case Retry(command, count) =>
context.log.info(s"$count retries left")
idle(child, command, count)
case CommandFailed(throwable) =>
replyTo ! StatusReply.Error(throwable)
idle(child)
}
private def idle(child: Child): Behavior[Command] = {
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey)
timers.startSingleTimer(
TimeoutKey,
Terminate,
TIMEOUT_DURATION
)
buffer.unstashAll(active(child.transition(READY)))
}
private def idle(child: Child, command: Command, count: Int = INITIAL_RETRY_COUNT): Behavior[Command] = {
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey)
if (count < INITIAL_RETRY_COUNT) {
val delay = Math.pow(EXPONENT_BASE_VALUE, INITIAL_RETRY_COUNT - count).seconds
context.log.debug(s"retrying $command in $delay seconds")
timers.startSingleTimer(command, delay)
} else context.self ! command
active(child, count)
}
private def active(child: Child, retryCount: Int = INITIAL_RETRY_COUNT): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Status(replyTo) =>
replyTo ! StatusReply.Success(child.withDuration)
Behaviors.same
case Hello(replyTo) =>
hello().pipe(context.pipeToSelf) {
case Success(hi) =>
CommandSucceeded(hi)
case Failure(throwable) =>
CommandFailed(throwable)
}
responding(child.transition(WAITING), replyTo)
case command@StartTask(replyTo) =>
startTask().pipe(context.pipeToSelf) {
case Success(id) =>
TaskStarted(id)
case Failure(throwable) =>
if (retryCount > 0) Retry(command, retryCount - 1)
else CommandFailed(throwable)
}
runningTask(child.transition(RUNNING), replyTo)
case command@ExecuteTask(id, replyTo) =>
executeTask(id).pipe(context.pipeToSelf) {
case Success(result) =>
TaskExecuted(result)
case Failure(throwable) =>
if (retryCount > 0) Retry(command, retryCount - 1)
else CommandFailed(throwable)
}
runningTask(child.transition(RUNNING), replyTo)
case Bye(replyTo) =>
bye().pipe(context.pipeToSelf) {
case Success(goodbye) =>
CommandSucceeded(goodbye)
case Failure(throwable) =>
CommandFailed(throwable)
}
responding(child.transition(WAITING), replyTo)
case Terminate =>
context.log.debug(s"terminating actor for child: ${child.id}")
Behaviors.stopped
}
private def hello(): Future[Hi] =
Future{Hi("Hello my friend!")}
private def startTask(): Future[Int] = Future {Random.nextInt(Int.MaxValue)}
private def executeTask(id: Int): Future[TaskResult] =
Future {
Thread.sleep(15000)
TaskResult(Random.nextLong(Long.MaxValue))
}
// Future.failed(new RuntimeException("BOOM!")
private def bye(): Future[GoodBye] = Future{GoodBye("Bye my friend!")}
}
case object INITIALIZING extends State {
override def toString: String = this.productPrefix
}
case object READY extends State {
override def toString: String = this.productPrefix
}
case object RUNNING extends State {
override def toString: String = this.productPrefix
}
case object WAITING extends State {
override def toString: String = this.productPrefix
}
object Actor {
def apply(id: Int, configuration: Configuration)(implicit ec: ExecutionContext): Behavior[Command] =
Behaviors
.supervise[Command](
Behaviors.withStash(configuration.getOptional[Int](STASH_BUFFER_SIZE)
.getOrElse(DEFAULT_STASH_BUFFER_SIZE)) {
buffer =>
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
load(id).pipe(context.pipeToSelf) {
case Success(child) => InitializationSucceeded(child)
case Failure(throwable) =>
InitializationFailed(id, throwable)
}
new Actor(
context,
buffer,
timers,
configuration
).initializing()
}
}
}
)
.onFailure(SupervisorStrategy.stop)
}
private case object Terminate extends Command
private case object TimeoutKey
}
import actors.Parent
import com.google.inject.AbstractModule
import play.api.libs.concurrent.AkkaGuiceSupport
class Module extends AbstractModule with AkkaGuiceSupport {
override def configure(): Unit =
bindTypedActor(Parent.Actor, "parent")
}
package object actors {
val STASH_BUFFER_SIZE = "app.actors.stash-buffer-size"
val ASK_TIME_OUT = "app.actors.ask-timeout"
val DEFAULT_ASK_TIMEOUT = 30
val DEFAULT_STASH_BUFFER_SIZE = 100
val TERMINATION_TIME_OUT = "app.actors.termination-timeout"
val DEFAULT_TERMINATION_TIME_OUT = 1
val EXPONENT_BASE = "app.actors.retry-exponent-base"
val DEFAULT_EXPONENT_BASE = 2
val RETRY_COUNT = "app.actors.retry-count"
val DEFAULT_RETRY_COUNT = 3
trait Message
case class Hi(message: String) extends Message
case class GoodBye(message: String) extends Message
case class TaskResult(result: Long) extends Message
}
package actors
import actors.Parent.Command
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.pattern.StatusReply
import akka.util.Timeout
import com.google.inject.Provides
import play.api.Configuration
import play.api.libs.concurrent.ActorModule
import scala.collection.Seq
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{Failure, Random, Success}
case class Parent(children: Seq[Child])
extends Command
object Parent {
val aggregationTimeout: FiniteDuration = 1.second
def initialize(numberOfChildren: Int): Future[Seq[Int]] = Future.successful(Seq.fill(numberOfChildren)(Random.nextInt(Int.MaxValue)))
sealed trait Command
sealed trait Request[T] extends Command {
def id: Int
def replyTo: ActorRef[StatusReply[T]]
}
case class Aggregated(children: Seq[Child], replyTo: ActorRef[StatusReply[StatusResult]]) extends Command
case class StatusResult(children: Seq[Child])
case class Status(id: Int, replyTo: ActorRef[StatusReply[StatusResult]]) extends Request[StatusResult]
case class StartTask(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Request[Message]
case class Hello(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Request[Message]
case class Bye(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Request[Message]
private case class InitializationFailed(throwable: Throwable) extends Command
private case class InitializationSucceeded(ids: Seq[Int]) extends Command
private case class ChildConfirmed[T](child: Child, request: Request[T]) extends Command
private case class ChildTerminated(id: Int) extends Command
private case class RequestSucceeded(response: Message, replyTo: ActorRef[StatusReply[Message]]) extends Command
private case class RequestFailed[T](throwable: Throwable, request: Request[T]) extends Command
private class Actor(configuration: Configuration,
context: ActorContext[Command],
buffer: StashBuffer[Command])(implicit timeout: Timeout, ec: ExecutionContext) {
def initializing(actorRefs: Map[Int, ActorRef[Child.Command]] = Map.empty): Behavior[Command] =
Behaviors.receiveMessage {
case InitializationSucceeded(ids: Seq[Int]) =>
buffer.unstashAll(active(ids.foldLeft(actorRefs) { (actorRefs, id) =>
actorRefs + (id -> spawnChild(id))
}))
case InitializationFailed(throwable) =>
context.log.error("Initialization Failed")
throw throwable
case other =>
buffer.stash(other)
Behaviors.same
}
def idle(actorRefs: Map[Int, ActorRef[Child.Command]], actorRef: ActorRef[Child.Command]): Behavior[Command] =
Behaviors.receiveMessage {
case Status(id, replyTo) =>
val numberOfChildren = actorRefs.size
if (numberOfChildren > 0)
context.spawnAnonymous(
Aggregator[StatusReply[Child], Aggregated](
self => actorRefs.values.foreach(child => child ! Child.Status(self)),
context.self,
replies => Aggregated(replies.map(_.getValue), replyTo),
numberOfChildren,
aggregationTimeout
)
)
else context.self ! Aggregated(Seq.empty, replyTo)
Behaviors.same
case Aggregated(result, replyTo) =>
replyTo ! StatusReply.Success(StatusResult(result))
Behaviors.same
case ChildConfirmed(child@Child(id, _, _, _), request) =>
context.log.info(s"created $child")
context.self ! request
idle(actorRefs + (id -> actorRef))
case RequestSucceeded(value, replyTo) =>
replyTo ! StatusReply.Success(value)
idle(actorRefs)
case RequestFailed(throwable, request) =>
context.stop(actorRef)
request.replyTo ! StatusReply.Error(throwable)
idle(actorRefs - request.id)
case other =>
buffer.stash(other)
Behaviors.same
}
def active(actorRefs: Map[Int, ActorRef[Child.Command]]): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Status(id, replyTo) =>
val numberOfChildren = actorRefs.size
if (numberOfChildren > 0)
context.spawnAnonymous(
Aggregator[StatusReply[Child], Aggregated](
self => actorRefs.values.foreach(child => child ! Child.Status(self)),
context.self,
replies => Aggregated(replies.map(_.getValue), replyTo),
numberOfChildren,
aggregationTimeout
)
)
else context.self ! Aggregated(Seq.empty, replyTo)
Behaviors.same
case Aggregated(result, replyTo) =>
replyTo ! StatusReply.Success(StatusResult(result))
Behaviors.same
case request@Hello(id, replyTo) =>
context.log.debug(s"hello received for child: $id")
handleRequest(request, actorRefs) { child =>
context.askWithStatus(child, Child.Hello) {
case Success(message) =>
RequestSucceeded(message, replyTo)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(actorRefs, child)
}
case request@Bye(id, replyTo) =>
context.log.debug(s"bye received for child: $id")
handleRequest(request, actorRefs) { child =>
context.askWithStatus(child, Child.Bye) {
case Success(message) =>
RequestSucceeded(message, replyTo)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(actorRefs, child)
}
case request@StartTask(id, replyTo) =>
context.log.debug(s"start task received for child: $id")
handleRequest(request, actorRefs) { child =>
context.askWithStatus(child, Child.StartTask) {
case Success(message) =>
RequestSucceeded(message, replyTo)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(actorRefs, child)
}
case ChildTerminated(id) =>
context.log.info(s"child with $id terminated")
idle(actorRefs - id)
}
def spawnChild(id: Int): ActorRef[Child.Command] = {
val actorRef = context.spawnAnonymous(Child.Actor(id, configuration))
context.watchWith(actorRef, ChildTerminated(id))
actorRef
}
def handleRequest[T](request: Request[T], actorRefs: Map[Int, ActorRef[Child.Command]])(
behavior: ActorRef[Child.Command] => Behavior[Command]
): Behavior[Command] =
actorRefs
.get(request.id)
.fold(loadChild[T](request, actorRefs))(behavior)
private def idle(actorRefs: Map[Int, ActorRef[Child.Command]]): Behavior[Command] = {
context.log.info(s"current children: ${actorRefs}")
buffer.unstashAll(active(actorRefs))
}
private def loadChild[T](request: Request[T], actorRefs: Map[Int, ActorRef[Child.Command]]): Behavior[Command] = {
context.log.info(s"creating child for ${request.id}")
val child = context.spawnAnonymous(Child.Actor(request.id, configuration))
context.watchWith(child, ChildTerminated(request.id))
confirm(request, actorRefs, child)
}
private def confirm[T](request: Request[T],
actorRefs: Map[Int, ActorRef[Child.Command]],
actorRef: ActorRef[Child.Command]): Behavior[Command] = {
context.askWithStatus(actorRef, Child.Status) {
case Success(child) =>
ChildConfirmed(child, request)
case Failure(throwable) =>
RequestFailed(throwable, request)
}
idle(actorRefs, actorRef)
}
}
object Actor extends ActorModule {
override type Message = Command
@Provides def apply(configuration: Configuration)(implicit ec: ExecutionContext): Behavior[Command] = Behaviors
.supervise[Command](
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) {
buffer =>
Behaviors
.setup { context =>
Behaviors.logMessages {
implicit val timeout: Timeout =
configuration.getOptional[Int](ASK_TIME_OUT).getOrElse(DEFAULT_ASK_TIMEOUT).seconds
initialize(4).pipe(context.pipeToSelf) {
case Success(ids) =>
InitializationSucceeded(ids)
case Failure(throwable) =>
InitializationFailed(throwable)
}
new Actor(configuration, context, buffer).initializing()
}
}
}
).onFailure(SupervisorStrategy.restart)
}
}
package models
import actors.Parent._
import actors.{ASK_TIME_OUT, Message}
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.{ActorRef, Scheduler}
import akka.util.Timeout
import play.api.Configuration
import javax.inject.Inject
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.DurationInt
class ParentChild @Inject()(configuration: Configuration,
parent: ActorRef[Command])(implicit scheduler: Scheduler, ex: ExecutionContext) {
implicit val timeout: Timeout = configuration.get[Int](ASK_TIME_OUT).seconds
def hello(id: Int): Future[Message] = parent.askWithStatus[Message](Hello(id, _))
def bye(id: Int): Future[Message] = parent.askWithStatus(Bye(id, _))
def startTask(id: Int): Future[Message] = parent.askWithStatus(StartTask(id, _))
def status(id: Int): Future[StatusResult] = parent.askWithStatus(Status(id, _))
}
package controllers
import actors.Parent.StatusResult
import com.google.inject.Singleton
import actors.{Child, GoodBye, Hi, TaskResult}
import akka.actor.typed.Scheduler
import play.api.libs.json.Json
import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents}
import usecase.ParentChild
import java.time.format.DateTimeFormatter
import javax.inject.Inject
import scala.concurrent.ExecutionContext
@Singleton class ParentChildController @Inject()(cc: ControllerComponents, parentChild: ParentChild)
(implicit ec: ExecutionContext, scheduler: Scheduler)
extends AbstractController(cc) {
val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss")
def hello(id: Int): Action[AnyContent] = Action.async {
parentChild.hello(id).map {
case Hi(message) => Ok(Json.toJson(Json.obj("id" -> id, "message" -> message)))
}.recover {
case e: Throwable =>
e.printStackTrace()
InternalServerError(Json.toJson(e.getMessage))
}
}
def bye(id: Int): Action[AnyContent] = Action.async {
parentChild.bye(id).map {
case GoodBye(message) => Ok(Json.toJson(Json.obj("id" -> id, "message" -> message)))
}.recover {
case e: Throwable =>
e.printStackTrace()
InternalServerError(Json.toJson(e.getMessage))
}
}
def startTask(id: Int): Action[AnyContent] = Action.async {
parentChild.startTask(id).map {
case TaskResult(result) => Ok(Json.toJson(Json.obj("id" -> id, "result" -> result)))
}.recover {
case e: Throwable =>
e.printStackTrace()
InternalServerError(Json.toJson(e.getMessage))
}
}
def status(id: Int) = Action.async {
// id is not used
parentChild.status(id).map{
case StatusResult(children) =>
Ok(Json.toJson(children.map(child =>
Json.obj("id" -> child.id, "created" -> child.created.format(dateTimeFormatter),
"duration" -> s"${child.duration} seconds",
"state" -> child.state.toString))))
}
}
}
GET /parent_child/:id/hello controllers.ParentChildController.hello(id: Int)
GET /parent_child/:id/bye controllers.ParentChildController.bye(id: Int)
GET /parent_child/:id/startTask controllers.ParentChildController.startTask(id: Int)
GET /parent_child/:id/status controllers.ParentChildController.status(id: Int) // id not used
@hanishi
Copy link
Author

hanishi commented Nov 4, 2022

  1. it demonstrates a sample Parent-Child structure.
  2. The parent uses Aggregator to collect the status of the children which have been spawned.
  3. Child actors terminate themselves when they have not been used for a while, which is configurable
  4. Child actors can retry the received request using exponential backoff upon failing to succeed.

@hanishi
Copy link
Author

hanishi commented Nov 5, 2022

Where Future.successful is used should be replaced with your actual implementation that runs on a different thread. Otherwise, the method will be executed on the calling thread, blocking the actor from transitioning to its subsequent behavior until the method returns

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment