Skip to content

Instantly share code, notes, and snippets.

@derekwyatt
Created August 16, 2012 14:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save derekwyatt/3370370 to your computer and use it in GitHub Desktop.
Save derekwyatt/3370370 to your computer and use it in GitHub Desktop.
The Terminator pattern
// Derivation implements this to create children
def createChildren(): Unit
case object CreateChildren
override def preStart() {
self ! CreateChildren
}
def preChildren: Receive = {
case CreateChildren =>
createChildren()
// possible unstash, if you're using it
context.become(withChildren)
case GetChildren(forActor) =>
// use stash or reply with "try again later"
}
def withChildren: Receive = {
// previous 'waiting' partial function
}
def childrenGiven(to: ActorRef): Receive = {
// as before
}
import akka.actor.{Actor, ActorRef}
class Master(surrogate: ActorRef) extends Actor {
import Terminator._
// Askf for the kids
override def preStart() {
surrogate ! GetChildren(self)
}
// Wait for the kids to show up
def waiting: Receive = {
case Children(kids) =>
// become our initialized state
context.become(initialized(kids))
}
// Do our normal business logic
def initialized(kids: Iterable[ActorRef]): Receive = {
case _ =>
}
// Start waiting
def receive = waiting
}
import akka.actor.{Actor, ActorRef, Terminated}
import akka.pattern.{gracefulStop, pipe}
import akka.dispatch.Future
import akka.util.Timeout
import akka.util.duration._
object Terminator {
// Protocol between us and the master
case class GetChildren(forActor: ActorRef)
case class Children(kids: Iterable[ActorRef])
}
abstract class Terminator extends Actor {
import Terminator._
import context._
implicit val stopTimeout = 5.minutes
case object AllDead
// Derivations implement
def order(kids: Iterable[ActorRef]): Iterable[ActorRef]
// Kills kids in the order given by the list
def killKids(kids: List[ActorRef]): Future[Any] = {
kids match {
case kid :: Nil =>
gracefulStop(kid, stopTimeout).flatMap { _ =>
Future { AllDead }
}
case Nil =>
Future { AllDead }
case kid :: rest =>
gracefulStop(kid, stopTimeout).flatMap { _ =>
killKids(rest)
}
}
}
// Initially we're waiting for the request for kids
def waiting: Receive = {
case GetChildren(forActor) =>
watch(forActor)
forActor ! Children(children)
become(childrenGiven(forActor))
}
// Once we've done it, we want to lock our aspect
// to the guy we gave them to
def childrenGiven(to: ActorRef): Receive = {
case GetChildren(forActor) if sender == to =>
forActor ! Children(children)
case Terminated(`to`) =>
killKids(order(children).toList) pipeTo self
case AllDead =>
stop(self)
}
// Start waiting
def receive = waiting
}
import akka.actor.{ActorSystem, Actor, Props, ActorRef}
import akka.testkit.{TestKit, ImplicitSender, TestProbe}
import org.scalatest.{WordSpec, BeforeAndAfterAll}
import org.scalatest.matchers.MustMatchers
// Our test worker will just help us see what's going
// on in our test
class TestWorker(a: ActorRef) extends Actor {
def receive = {
case _ =>
}
override def postStop() = a ! self.path.name
}
class TerminatorSpec extends
TestKit(ActorSystem("TerminatorSpec"))
with ImplicitSender
with WordSpec
with BeforeAndAfterAll
with MustMatchers {
import Terminator._
override def afterAll() {
system.shutdown()
}
"Terminator" should {
"work" in {
// Instantiate a real Terminator that starts up the kids
// in the preStart() method and provides an ordering of
// them
val a = system.actorOf(Props(new Terminator {
override def preStart() {
(1 to 20).foreach { i =>
context.actorOf(Props(new TestWorker(testActor)),
"Worker-%02d".format(i))
}
}
// Just for fun, we'll shut them down in reverse
// alphabetic order
def order(kids: Iterable[ActorRef]): Iterable[ActorRef] =
kids.toSeq.sorted.reverse
}))
// Need a test probe that we can stop
val p = TestProbe()
a.tell(GetChildren(p.ref), p.ref)
// A bit of a weak test, but who cares?
p.expectMsgPF() {
case Children(kids) =>
kids.size must be (20)
}
// Stop the probe and wait for the 'dead' messages
// to arrive, ensuring they're in the right order
system.stop(p.ref)
(1 to 20).reverse.foreach { i =>
expectMsg("Worker-%02d".format(i))
}
}
}
}
import akka.actor.Actor
class Worker extends Actor {
override def preStart() {
println("%s is running".format(self.path.name))
}
override def postStop() {
println("%s has stopped".format(self.path.name))
}
def receive = {
case msg =>
println("Cool, I got a message: " + msg)
}
}
@spodkowinski
Copy link

Any chance adding a license info to your code?

@njacobs5074
Copy link

On line 35 in Terminator.scala, you pass in a variable to TestWorker called testActor. What is this?

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment