Skip to content

Instantly share code, notes, and snippets.

@derekwyatt
Created August 18, 2012 17:49
Show Gist options
  • Save derekwyatt/3388701 to your computer and use it in GitHub Desktop.
Save derekwyatt/3388701 to your computer and use it in GitHub Desktop.
Shutdown pattern
// Spawn your futures
val fs = (1 to 100).map { i =>
Future { Thread.sleep(i); i }
}
// Wrap all of the work up into a single
// Future
val f = Future.sequence(fs)
// Wait on it forever - i.e. until it's done
Await.result(f, Duration.Inf)
// Shut down
system.shutdown()
class ProductionReaper extends Reaper {
// Shutdown
def allSoulsReaped(): Unit = context.system.shutdown()
}
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
// Used by others to register an Actor for watching
case class WatchMe(ref: ActorRef)
}
abstract class Reaper extends Actor {
import Reaper._
// Keep track of what we're watching
val watched = ArrayBuffer.empty[ActorRef]
// Derivations need to implement this method. It's the
// hook that's called when everything's dead
def allSoulsReaped(): Unit
// Watch and check for termination
final def receive = {
case WatchMe(ref) =>
context.watch(ref)
watched += ref
case Terminated(ref) =>
watched -= ref
if (watched.isEmpty) allSoulsReaped()
}
}
import akka.actor.{ActorSystem, Props, ActorRef}
import akka.testkit.{TestKit, ImplicitSender, TestProbe}
import org.scalatest.{WordSpec, BeforeAndAfterAll}
import org.scalatest.matchers.MustMatchers
// Our test reaper. Sends the snooper a message when all
// the souls have been reaped
class TestReaper(snooper: ActorRef) extends Reaper {
def allSoulsReaped(): Unit = snooper ! "Dead"
}
class ReaperSpec extends TestKit(ActorSystem("ReaperSpec"))
with ImplicitSender
with WordSpec
with BeforeAndAfterAll
with MustMatchers {
import Reaper._
override def afterAll() {
system.shutdown()
}
"Reaper" should {
"work" in {
// Set up some dummy Actors
val a = TestProbe()
val b = TestProbe()
val c = TestProbe()
val d = TestProbe()
// Build our reaper
val reaper = system.actorOf(Props(new TestReaper(testActor)))
// Watch a couple
reaper ! WatchMe(a.ref)
reaper ! WatchMe(d.ref)
// Stop them
system.stop(a.ref)
system.stop(d.ref)
// Make sure we've been called
expectMsg("Dead")
}
}
}
@soulmachine
Copy link

What will happen if an uncaught exception is thrown out in the watched actor? It seems the watched actor won't be able to send out a Terminated message to its parent actor, then the ArrayBuffer Reaper.watched will never be empty

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment