Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save raboof/d78eae72a6c10cdafd99972cb382564d to your computer and use it in GitHub Desktop.
Save raboof/d78eae72a6c10cdafd99972cb382564d to your computer and use it in GitHub Desktop.
package playground
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.util.Timeout
import akka.pattern.{ask, pipe}
import playground.Parent.{BulkTasks, Finish, TaskCompleted, TaskFailed}
trait Task {
val id: String
def execute()
}
class OkTask(val id: String = "OK") extends Task {
override def execute(): Unit = println(s"Executed task $id")
}
class FailureTask(val id: String = "FailureTask", message: String) extends Task {
override def execute(): Unit = throw new RuntimeException(message)
}
object Parent {
def props(receiver: ActorRef) = Props(new Parent(receiver))
case class BulkTasks(tasks: List[Task])
case class TaskCompleted(task: Task)
case class TaskFailed(task: Task, exception: Throwable)
case class Finish(statuses: Map[String, String])
}
class Parent(receiver: ActorRef) extends Actor with ActorLogging {
implicit val timeout: Timeout = 10 seconds
implicit val ec: ExecutionContext = context.dispatcher
override def receive: Receive = {
case BulkTasks(Nil) => log.info("No task to process")
case BulkTasks(tasks) =>
val results: List[Future[(String, String)]] = tasks
.map(task => {
val actor = context.actorOf(Props(new Child(task)), task.id)
(actor ? Child.Start).map {
case _: TaskCompleted => (task.id -> "OK")
case TaskFailed(_, throwable) => (task.id -> throwable.getMessage)
case other => (task.id -> s"Invalid response: $other")
}
})
Future.sequence(results)
.map(statuses => Finish(statuses.toMap))
.pipeTo(receiver)
}
}
object Child {
case object Start
}
class Child(task: Task) extends Actor with ActorLogging {
import Child._
override def receive: Receive = {
case Start =>
log.info(s"Executing task ${task.id}")
Try {
task.execute()
} match {
case Success(_) =>
sender() ! TaskCompleted(task)
case Failure(e) =>
sender() ! TaskFailed(task, e)
}
}
}
package playground
import akka.actor.ActorSystem
import akka.testkit.{ImplicitSender, TestKit}
import org.scalatest.{MustMatchers, WordSpecLike}
import playground.Parent.{BulkTasks, Finish}
class ParentChildTaskProcessingExampleSpec extends TestKit(ActorSystem("testsystem"))
with WordSpecLike
with MustMatchers
with StopSystemAfterAll {
"Parent must" must {
"process all Ok BulkMessage and stop" in {
val actor = system.actorOf(Parent.props(testActor))
actor ! BulkTasks(List(new OkTask("ok1"), new OkTask("ok2")))
expectMsg(Finish(Map("ok1" -> "OK", "ok2" -> "OK")))
}
"process Ok and Failure BulkMessage and stop" in {
val actor = system.actorOf(Parent.props(testActor))
actor ! BulkTasks(List(new OkTask("ok1"), new FailureTask("fa1", "Awesome Failure!!")))
expectMsg(Finish(Map("ok1" -> "OK", "fa1" -> "Awesome Failure!!")))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment