Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save shekhargulati/07efca9d0a6a9daee7e8724228c2a91d to your computer and use it in GitHub Desktop.
Save shekhargulati/07efca9d0a6a9daee7e8724228c2a91d to your computer and use it in GitHub Desktop.
package playground
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import playground.Parent.{BulkTasks, Finish, TaskCompleted, TaskFailed}
import scala.util.{Failure, Success, Try}
trait Task {
val id: String
def execute()
}
class OkTask(val id: String = "OK") extends Task {
override def execute(): Unit = println(s"Executed task $id")
}
class FailureTask(val id: String = "FailureTask", message: String) extends Task {
override def execute(): Unit = throw new RuntimeException(message)
}
object Parent {
def props(receiver: ActorRef) = Props(new Parent(receiver))
case class BulkTasks(tasks: List[Task])
case class TaskCompleted(task: Task)
case class TaskFailed(task: Task, exception: Throwable)
case class Finish(statuses: Map[String, String])
}
class Parent(receiver: ActorRef) extends Actor with ActorLogging {
var totalTasks = -1
var statuses = Map[String, String]()
override def receive: Receive = {
case BulkTasks(Nil) => log.info("No task to process")
case BulkTasks(task :: remainingTasks) =>
if (totalTasks == -1) {
totalTasks = remainingTasks.size + 1
}
val taskActor = context.actorOf(Props[Child], task.id)
taskActor ! task
self ! BulkTasks(remainingTasks)
case TaskCompleted(task) =>
totalTasks -= 1
log.info(s"Completed task ${task.id}")
statuses = statuses + (task.id -> "OK")
checkStateAndNotify()
case TaskFailed(task, throwable) =>
totalTasks -= 1
log.info(s"Task failed ${task.id}")
statuses = statuses + (task.id -> throwable.getMessage)
checkStateAndNotify()
}
private def checkStateAndNotify(): Unit = {
if (totalTasks == 0) {
log.info(s"Processed all messages...Sending finish message to sender")
receiver ! Finish(statuses)
} else {
log.info(s"Still $totalTasks task(s) remaining")
}
}
}
class Child extends Actor with ActorLogging {
override def receive: Receive = {
case task: Task =>
log.info(s"Executing task ${task.id}")
Try {
task.execute()
} match {
case Success(_) =>
context.parent ! TaskCompleted(task)
case Failure(e) =>
context.parent ! TaskFailed(task, e)
}
}
}
package playground
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.scalatest.{MustMatchers, WordSpecLike}
import playground.Parent.{BulkTasks, Finish}
class ParentChildTaskProcessingExampleSpec extends TestKit(ActorSystem("testsystem"))
with WordSpecLike
with MustMatchers
with StopSystemAfterAll {
"Parent must" must {
"process all Ok BulkMessage and stop" in {
val actor = system.actorOf(Parent.props(testActor))
actor ! BulkTasks(List(new OkTask("ok1"), new OkTask("ok2")))
expectMsg(Finish(Map("ok1" -> "OK", "ok2" -> "OK")))
}
"process Ok and Failure BulkMessage and stop" in {
val actor = system.actorOf(Parent.props(testActor))
actor ! BulkTasks(List(new OkTask("ok1"), new FailureTask("fa1", "Awesome Failure!!")))
expectMsg(Finish(Map("ok1" -> "OK", "fa1" -> "Awesome Failure!!")))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment