Created
August 16, 2012 14:10
-
-
Save derekwyatt/3370370 to your computer and use it in GitHub Desktop.
The Terminator pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Derivation implements this to create children | |
def createChildren(): Unit | |
case object CreateChildren | |
override def preStart() { | |
self ! CreateChildren | |
} | |
def preChildren: Receive = { | |
case CreateChildren => | |
createChildren() | |
// possible unstash, if you're using it | |
context.become(withChildren) | |
case GetChildren(forActor) => | |
// use stash or reply with "try again later" | |
} | |
def withChildren: Receive = { | |
// previous 'waiting' partial function | |
} | |
def childrenGiven(to: ActorRef): Receive = { | |
// as before | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorRef} | |
class Master(surrogate: ActorRef) extends Actor { | |
import Terminator._ | |
// Askf for the kids | |
override def preStart() { | |
surrogate ! GetChildren(self) | |
} | |
// Wait for the kids to show up | |
def waiting: Receive = { | |
case Children(kids) => | |
// become our initialized state | |
context.become(initialized(kids)) | |
} | |
// Do our normal business logic | |
def initialized(kids: Iterable[ActorRef]): Receive = { | |
case _ => | |
} | |
// Start waiting | |
def receive = waiting | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorRef, Terminated} | |
import akka.pattern.{gracefulStop, pipe} | |
import akka.dispatch.Future | |
import akka.util.Timeout | |
import akka.util.duration._ | |
object Terminator { | |
// Protocol between us and the master | |
case class GetChildren(forActor: ActorRef) | |
case class Children(kids: Iterable[ActorRef]) | |
} | |
abstract class Terminator extends Actor { | |
import Terminator._ | |
import context._ | |
implicit val stopTimeout = 5.minutes | |
case object AllDead | |
// Derivations implement | |
def order(kids: Iterable[ActorRef]): Iterable[ActorRef] | |
// Kills kids in the order given by the list | |
def killKids(kids: List[ActorRef]): Future[Any] = { | |
kids match { | |
case kid :: Nil => | |
gracefulStop(kid, stopTimeout).flatMap { _ => | |
Future { AllDead } | |
} | |
case Nil => | |
Future { AllDead } | |
case kid :: rest => | |
gracefulStop(kid, stopTimeout).flatMap { _ => | |
killKids(rest) | |
} | |
} | |
} | |
// Initially we're waiting for the request for kids | |
def waiting: Receive = { | |
case GetChildren(forActor) => | |
watch(forActor) | |
forActor ! Children(children) | |
become(childrenGiven(forActor)) | |
} | |
// Once we've done it, we want to lock our aspect | |
// to the guy we gave them to | |
def childrenGiven(to: ActorRef): Receive = { | |
case GetChildren(forActor) if sender == to => | |
forActor ! Children(children) | |
case Terminated(`to`) => | |
killKids(order(children).toList) pipeTo self | |
case AllDead => | |
stop(self) | |
} | |
// Start waiting | |
def receive = waiting | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ActorSystem, Actor, Props, ActorRef} | |
import akka.testkit.{TestKit, ImplicitSender, TestProbe} | |
import org.scalatest.{WordSpec, BeforeAndAfterAll} | |
import org.scalatest.matchers.MustMatchers | |
// Our test worker will just help us see what's going | |
// on in our test | |
class TestWorker(a: ActorRef) extends Actor { | |
def receive = { | |
case _ => | |
} | |
override def postStop() = a ! self.path.name | |
} | |
class TerminatorSpec extends | |
TestKit(ActorSystem("TerminatorSpec")) | |
with ImplicitSender | |
with WordSpec | |
with BeforeAndAfterAll | |
with MustMatchers { | |
import Terminator._ | |
override def afterAll() { | |
system.shutdown() | |
} | |
"Terminator" should { | |
"work" in { | |
// Instantiate a real Terminator that starts up the kids | |
// in the preStart() method and provides an ordering of | |
// them | |
val a = system.actorOf(Props(new Terminator { | |
override def preStart() { | |
(1 to 20).foreach { i => | |
context.actorOf(Props(new TestWorker(testActor)), | |
"Worker-%02d".format(i)) | |
} | |
} | |
// Just for fun, we'll shut them down in reverse | |
// alphabetic order | |
def order(kids: Iterable[ActorRef]): Iterable[ActorRef] = | |
kids.toSeq.sorted.reverse | |
})) | |
// Need a test probe that we can stop | |
val p = TestProbe() | |
a.tell(GetChildren(p.ref), p.ref) | |
// A bit of a weak test, but who cares? | |
p.expectMsgPF() { | |
case Children(kids) => | |
kids.size must be (20) | |
} | |
// Stop the probe and wait for the 'dead' messages | |
// to arrive, ensuring they're in the right order | |
system.stop(p.ref) | |
(1 to 20).reverse.foreach { i => | |
expectMsg("Worker-%02d".format(i)) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.Actor | |
class Worker extends Actor { | |
override def preStart() { | |
println("%s is running".format(self.path.name)) | |
} | |
override def postStop() { | |
println("%s has stopped".format(self.path.name)) | |
} | |
def receive = { | |
case msg => | |
println("Cool, I got a message: " + msg) | |
} | |
} |
On line 35 in Terminator.scala, you pass in a variable to TestWorker called testActor
. What is this?
Thanks!
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Any chance adding a license info to your code?