Skip to content

Instantly share code, notes, and snippets.

@havocp
Created May 30, 2012 23:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save havocp/2839564 to your computer and use it in GitHub Desktop.
Save havocp/2839564 to your computer and use it in GitHub Desktop.
some ActorCell hacking
diff --git a/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala
index 120caa3..00ef54e 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala
@@ -5,6 +5,7 @@
package akka.actor
import akka.testkit._
+import akka.util.duration._
object HotSwapSpec {
abstract class Becomer extends Actor {
@@ -35,6 +36,38 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
expectMsg("4:pigdog")
}
+ "be able to unbecome back down to receive in its constructor" in {
+ val a = system.actorOf(Props(new Becomer {
+ for (i ← 1 to 4) context.become({ case always ⇒ sender ! i + ":" + always })
+ for (i ← 1 to 4) context.unbecome()
+ def receive = { case always ⇒ sender ! "SUCCESS" }
+ }))
+ a ! "pigdog"
+ expectMsg("SUCCESS")
+ }
+
+ "be able to unbecome back down to receive in its constructor after restart" in {
+ val restarter = system.actorOf(Props(new Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1)(List(classOf[Exception]))
+ val a = context.actorOf(Props(new Becomer {
+ for (i ← 1 to 4) context.become({ case always ⇒ sender ! i + ":" + always })
+ for (i ← 1 to 4) context.unbecome()
+ def receive = { case always ⇒ sender ! "SUCCESS" }
+ }))
+
+ def receive = {
+ case "killChild" ⇒ a ! Kill
+ case always ⇒ a forward always
+ }
+ }))
+
+ restarter ! "pigdog"
+ expectMsg("SUCCESS")
+ restarter ! "killChild"
+ restarter ! "pigdog"
+ expectMsg("SUCCESS")
+ }
+
"be able to become with stacking in its constructor" in {
val a = system.actorOf(Props(new Becomer {
context.become({ case always ⇒ sender ! "pigdog:" + always; context.unbecome() }, false)
@@ -124,4 +157,49 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
expectMsg("0")
}
}
+
+ "all behaviors removed after failed constructor" in {
+ val a = system.actorOf(Props(new Becomer {
+ context.become { case always ⇒ sender ! always }
+ throw new RuntimeException("expected exception: not creating Becomer")
+ def receive = { case always ⇒ sender ! "FAILURE" }
+ }))
+ a ! "pigdog"
+ expectNoMsg(1 second)
+ }
+
+ "all behaviors removed after failed preStart" in {
+ val a = system.actorOf(Props(new Becomer {
+ context.become { case always ⇒ sender ! always }
+ override def preStart = throw new RuntimeException("expected exception: failing to preStart")
+ def receive = { case always ⇒ sender ! "FAILURE" }
+ }))
+ a ! "pigdog"
+ expectNoMsg(1 second)
+ }
+
+ "all behaviors removed after failed postRestart" in {
+ val restarter = system.actorOf(Props(new Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1)(List(classOf[Exception]))
+
+ val a = system.actorOf(Props(new Becomer {
+ context.become { case always ⇒ sender ! always }
+ def receive = { case always ⇒ sender ! "FAILURE" }
+ override def postRestart(cause: Throwable): Unit = {
+ throw new RuntimeException("expected exception: failing to postRestart")
+ }
+ }))
+
+ def receive = {
+ case "killChild" ⇒ a ! Kill
+ case always ⇒ a forward always
+ }
+ }))
+
+ restarter ! "pigdog"
+ expectMsg("pigdog")
+ restarter ! "killChild"
+ restarter ! "pigdog"
+ expectNoMsg(1 second)
+ }
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala
index 197e749..539d955 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala
@@ -8,6 +8,7 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Await }
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import akka.testkit.AkkaSpec
import akka.testkit.DefaultTimeout
+import akka.testkit.ImplicitSender
import akka.pattern.ask
import akka.util.duration._
import akka.util.NonFatal
@@ -24,7 +25,7 @@ object SupervisorMiscSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
-class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with DefaultTimeout {
+class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with DefaultTimeout with ImplicitSender {
"A Supervisor" must {
@@ -142,5 +143,118 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
expectMsg("green")
}
+ class DelegatingStrategy(val delegate: SupervisorStrategy) extends SupervisorStrategy {
+ import java.util.concurrent.atomic.AtomicInteger
+
+ val supervisorFailingCount = new AtomicInteger(0)
+ val supervisorRestartedCount = new AtomicInteger(0)
+ override def decider = delegate.decider
+ override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit =
+ delegate.handleChildTerminated(context, child, children)
+ override def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit =
+ delegate.processFailure(context, restart, child, cause, stats, children)
+ override def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
+ supervisorFailingCount.incrementAndGet()
+ delegate.handleSupervisorFailing(supervisor, children)
+ }
+
+ override def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = {
+ supervisorRestartedCount.incrementAndGet()
+ delegate.handleSupervisorRestarted(cause, supervisor, children)
+ }
+
+ override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean =
+ delegate.handleFailure(context, child, cause, stats, children)
+ }
+
+ "send handleSupervisorFailing and handleSupervisorRestarted to the right Actor instance on postRestart fail" in {
+ var strategies = Seq.empty[DelegatingStrategy]
+
+ val restarter = system.actorOf(Props(new Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))
+ val a = context.actorOf(Props(new Actor {
+ override val supervisorStrategy = {
+ val s = new DelegatingStrategy(OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))
+ strategies = strategies :+ s
+ s
+ }
+
+ override def postRestart(cause: Throwable): Unit =
+ throw new RuntimeException("expected exception: postRestart failing")
+
+ def receive = {
+ case anything ⇒ sender ! anything + " from supervised"
+ }
+ }), name = "child")
+ def receive = {
+ case "killChild" ⇒ a ! Kill
+ case always ⇒ a forward always
+ }
+ }), name = "restarterPostRestart")
+
+ restarter ! "pigdog"
+ expectMsg("pigdog from supervised")
+ restarter ! "killChild"
+ restarter ! "pigdog"
+ expectNoMsg(1 second)
+ restarter ! "killChild"
+ restarter ! "pigdog"
+ expectNoMsg(1 second)
+ restarter ! "killChild"
+ restarter ! "pigdog"
+ expectNoMsg(1 second)
+
+ strategies.size must be(4)
+ val failCounts = strategies.map(_.supervisorFailingCount.get)
+ val restartedCounts = strategies.map(_.supervisorRestartedCount.get)
+ // because each actor constructs OK then fails in postRestart,
+ // the SupervisorStrategy for the actor itself should be used.
+ (failCounts, restartedCounts) must be((Seq(1, 1, 1, 1), Seq(1, 1, 1, 0)))
+
+ system.stop(restarter)
+ }
+
+ "send handleSupervisorFailing and handleSupervisorRestarted to the right Actor instance on constructor fail" in {
+ var strategies = Seq.empty[DelegatingStrategy]
+
+ val restarter = system.actorOf(Props(new Actor {
+ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception]))
+ val a = context.actorOf(Props(new Actor {
+ override val supervisorStrategy = {
+ val s = new DelegatingStrategy(OneForOneStrategy(maxNrOfRetries = 3)(List(classOf[Exception])))
+ strategies = strategies :+ s
+ s
+ }
+
+ // fail the constructor after the first run
+ if (strategies.size > 1)
+ throw new RuntimeException("expected exception: constructor failing")
+
+ def receive = {
+ case anything ⇒ sender ! anything + " from supervised"
+ }
+ }), name = "child")
+ def receive = {
+ case "killChild" ⇒ a ! Kill
+ case always ⇒ a forward always
+ }
+ }), name = "restarterConstructor")
+
+ restarter ! "pigdog"
+ expectMsg("pigdog from supervised")
+ restarter ! "killChild"
+ restarter ! "pigdog"
+ expectNoMsg(1 second)
+
+ strategies.size must be(4)
+ val failCounts = strategies.map(_.supervisorFailingCount.get)
+ val restartedCounts = strategies.map(_.supervisorRestartedCount.get)
+ // original Actor should get its own failure plus 3 constructor failures,
+ // and 3 restart attempts; because no later attempt constructs successfully.
+ (failCounts, restartedCounts) must be((Seq(4, 0, 0, 0), Seq(3, 0, 0, 0)))
+
+ system.stop(restarter)
+ }
+
}
}
diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
index d295e6d..ddb5974 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala
@@ -354,7 +354,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration)
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
- EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
+ EventFilter[IllegalStateException]("error while recreating actor", occurrences = 1)) {
intercept[RuntimeException] {
Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
index d4d5239..eebb451 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala
@@ -78,12 +78,14 @@ trait ActorContext extends ActorRefFactory {
/**
* Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
* Puts the behavior on top of the hotswap stack.
- * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
+ * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack.
*/
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
+ * Never removes the original `receive` behavior (once all `become()` have
+ * been reversed, this becomes a no-op).
*/
def unbecome(): Unit
@@ -185,12 +187,18 @@ private[akka] object ActorCell {
final val emptyReceiveTimeoutData: (Long, Cancellable) = (-1, emptyCancellable)
- final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior)
+ // when the Actor instance postdates a recreate or terminate,
+ // we don't want to use its actual behavior, so we put this
+ // in instead just to maintain the invariant that the
+ // behaviorStack is only empty while actor is null.
+ // TODO it's probably a bug if we receive a message while
+ // this is the behavior stack, so we can probably drop this.
+ final val deadActorBehaviorStack: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior)
sealed trait SuspendReason
case object UserRequest extends SuspendReason
- case class Recreation(cause: Throwable) extends SuspendReason
- case object Termination extends SuspendReason
+ case class Recreation(cause: Throwable, failed: Actor) extends SuspendReason
+ case class Termination(terminatedOption: Option[Actor]) extends SuspendReason
trait ChildrenContainer {
def add(child: ActorRef): ChildrenContainer
@@ -283,8 +291,8 @@ private[akka] object ActorCell {
def remove(child: ActorRef): ChildrenContainer = {
val t = toDie - child
if (t.isEmpty) reason match {
- case Termination ⇒ TerminatedChildrenContainer
- case _ ⇒ NormalChildrenContainer(c - child.path.name)
+ case _: Termination ⇒ TerminatedChildrenContainer
+ case _ ⇒ NormalChildrenContainer(c - child.path.name)
}
else copy(c - child.path.name, t)
}
@@ -357,12 +365,12 @@ private[akka] class ActorCell(
var childrenRefs: ChildrenContainer = EmptyChildrenContainer
private def isTerminating = childrenRefs match {
- case TerminatingChildrenContainer(_, _, Termination) ⇒ true
+ case TerminatingChildrenContainer(_, _, _: Termination) ⇒ true
case TerminatedChildrenContainer ⇒ true
case _ ⇒ false
}
private def isNormal = childrenRefs match {
- case TerminatingChildrenContainer(_, _, Termination | _: Recreation) ⇒ false
+ case TerminatingChildrenContainer(_, _, _: Termination | _: Recreation) ⇒ false
case _ ⇒ true
}
@@ -409,8 +417,23 @@ private[akka] class ActorCell(
var currentMessage: Envelope = null
+ // invariant: actor eq null whenever our actor never existed or is in the
+ // process of becoming nonexistent; currently these windows of time:
+ // - from ActorCell construction until first Actor instance constructor completes
+ // - from recreate request until recreated Actor instance constructor completes or fails
+ // - from terminate request until we terminate or fail to terminate
+ // After successful termination or failed recreation, set to a "dead" actor
+ // and the actor's real behavior is removed from behaviorStack.
+ // "dead" actors are kept around just to use their supervisorStrategy.
var actor: Actor = _
+ // behaviorStack has three states:
+ // - set to empty anytime actor is set to null, and remains empty
+ // until start of Actor instance constructor
+ // - from start of Actor instance constructor to end of constructor,
+ // collects become() behaviors called by constructor
+ // - when actor is set to non-null post-construction, behaviorStack
+ // must be set to valid and ready-to-use behaviors
private var behaviorStack: Stack[Actor.Receive] = Stack.empty
@volatile //This must be volatile since it isn't protected by the mailbox status
@@ -491,24 +514,33 @@ private[akka] class ActorCell(
case _ ⇒ system.deadLetters
}
+ protected def clearActor(): Unit = {
+ behaviorStack = Stack.empty
+ actor = null
+ }
+
//This method is in charge of setting up the contextStack and create a new instance of the Actor
- protected def newActor(): Actor = {
+ protected def createActor(): Unit = {
+ require(behaviorStack.isEmpty)
+ require(actor eq null)
+
contextStack.set(contextStack.get.push(this))
try {
- import ActorCell.behaviorStackPlaceHolder
-
- behaviorStack = behaviorStackPlaceHolder
val instance = props.creator.apply()
if (instance eq null)
throw new ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'")
- behaviorStack = behaviorStack match {
- case `behaviorStackPlaceHolder` ⇒ Stack.empty.push(instance.receive)
- case newBehaviors ⇒ Stack.empty.push(instance.receive).pushAll(newBehaviors.reverse.drop(1))
+ if (behaviorStack.isEmpty) {
+ behaviorStack = Stack.empty.push(instance.receive)
+ } else {
+ behaviorStack = Stack.empty.push(instance.receive).pushAll(behaviorStack.reverse)
}
- instance
+ actor = instance
} finally {
+ if (actor eq null)
+ behaviorStack = Stack.empty // remove any become() from failed create
+
val stackAfter = contextStack.get
if (stackAfter.nonEmpty)
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context
@@ -520,11 +552,12 @@ private[akka] class ActorCell(
def create(): Unit = if (isNormal) {
try {
- val created = newActor()
- actor = created
- created.preStart()
+ require(actor eq null)
+ require(behaviorStack.isEmpty)
+ createActor()
+ actor.preStart()
checkReceiveTimeout
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "started (" + actor + ")"))
} catch {
case NonFatal(i: InstantiationException) ⇒
throw new ActorInitializationException(self,
@@ -539,21 +572,21 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = if (isNormal) {
try {
val failedActor = actor
+ clearActor()
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
- if (failedActor ne null) {
- val c = currentMessage //One read only plz
- try {
- if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
- } finally {
- clearActorFields(failedActor)
- }
+ require(failedActor ne null)
+ val c = currentMessage //One read only plz
+ try {
+ if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
+ } finally {
+ clearActorFields(failedActor)
}
childrenRefs match {
case ct: TerminatingChildrenContainer ⇒
- childrenRefs = ct.copy(reason = Recreation(cause))
+ childrenRefs = ct.copy(reason = Recreation(cause, failedActor))
dispatcher suspend this
case _ ⇒
- doRecreate(cause, failedActor)
+ finishRecreate(cause, failedActor)
}
} catch {
case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e match {
@@ -583,16 +616,19 @@ private[akka] class ActorCell(
setReceiveTimeout(None)
cancelReceiveTimeout
+ val terminated = actor
+ clearActor()
+
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
childrenRefs match {
case ct: TerminatingChildrenContainer ⇒
- childrenRefs = ct.copy(reason = Termination)
+ childrenRefs = ct.copy(reason = Termination(Option(terminated)))
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
- case _ ⇒ doTerminate()
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(terminated), "stopping"))
+ case _ ⇒ finishTerminate(Option(terminated))
}
}
@@ -662,9 +698,17 @@ private[akka] class ActorCell(
become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld)
def unbecome(): Unit = {
- val original = behaviorStack
- val popped = original.pop
- behaviorStack = if (popped.isEmpty) original else popped
+ // actor.receive at the base of the stack can never be popped,
+ // but during actor construction we can pop down to empty.
+ if (actor ne null) {
+ require(behaviorStack.nonEmpty)
+ val original = behaviorStack
+ val popped = original.pop
+ behaviorStack = if (popped.isEmpty) original else popped
+ } else {
+ if (behaviorStack.nonEmpty)
+ behaviorStack = behaviorStack.pop
+ }
}
def autoReceiveMessage(msg: Envelope): Unit = {
@@ -682,16 +726,22 @@ private[akka] class ActorCell(
}
final def receiveMessage(msg: Any): Unit = {
+ require(actor ne null)
+ require(behaviorStack.nonEmpty)
//FIXME replace with behaviorStack.head.applyOrElse(msg, unhandled) + "-optimize"
val head = behaviorStack.head
if (head.isDefinedAt(msg)) head.apply(msg) else actor.unhandled(msg)
}
- private def doTerminate() {
- val a = actor
+ private def finishTerminate(terminatedOption: Option[Actor]) {
+ require(actor eq null)
+ require(behaviorStack.isEmpty)
try {
try {
- if (a ne null) a.postStop()
+ for (terminated ← terminatedOption) {
+ if (terminated.context ne null)
+ terminated.postStop()
+ }
} finally {
dispatcher.detach(this)
}
@@ -700,35 +750,66 @@ private[akka] class ActorCell(
parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self))
if (system.settings.DebugLifecycle)
- system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped"))
+ system.eventStream.publish(Debug(self.path.toString, clazz(terminatedOption.orNull), "stopped"))
} finally {
- behaviorStack = ActorCell.behaviorStackPlaceHolder
- clearActorFields(a)
- actor = null
+ for (terminated ← terminatedOption) {
+ clearActorFields(terminated)
+ actor = terminated
+ behaviorStack = ActorCell.deadActorBehaviorStack
+ }
}
}
}
- private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try {
- // after all killed children have terminated, recreate the rest, then go on to start the new instance
- actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
- val freshActor = newActor()
- actor = freshActor // this must happen before postRestart has a chance to fail
- if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
-
- freshActor.postRestart(cause)
- if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
-
- dispatcher.resume(this)
+ // if we have the Actor instance cleared pending termination
+ // or recreation, this guards against failure to complete
+ // the termination or recreation
+ private def handlingCleanupFailure(deadOption: Option[Actor], stage: String)(body: ⇒ Unit): Unit = try {
+ body
} catch {
case NonFatal(e) ⇒ try {
- dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
- // prevent any further messages to be processed until the actor has been restarted
+ dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(deadOption.orNull), "error while " + stage + " actor"), e))
+
dispatcher.suspend(this)
- actor.supervisorStrategy.handleSupervisorFailing(self, children) // FIXME Should this be called on actor or failedActor?
- clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called.
+
+ for (dead ← deadOption) {
+ dead.supervisorStrategy.handleSupervisorFailing(self, children)
+ // we won't call preRestart or postStop again with fields cleared
+ clearActorFields(dead)
+ }
} finally {
- parent.tell(Failed(new ActorInitializationException(self, "exception during re-creation", e)), self)
+ // put the failed actor back so we have its supervisorStrategy
+ for (dead ← deadOption) {
+ actor = dead
+ behaviorStack = ActorCell.deadActorBehaviorStack
+ }
+ parent.tell(Failed(new ActorInitializationException(self, "exception while " + stage + " actor", e)), self)
+ }
+ }
+
+ // after all killed children have terminated, recreate the rest, then go on to start the new instance
+ private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try {
+ require(failedActor ne null)
+ require(failedActor.context eq null)
+ require(actor eq null)
+ require(behaviorStack.isEmpty)
+
+ // failures are handled by the "current" actor's supervisorStrategy,
+ // which means failedActor until actor ne null and then actor after
+
+ handlingCleanupFailure(Some(failedActor), "recreating") {
+ failedActor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
+
+ createActor()
+
+ handlingCleanupFailure(Some(actor), "postRestart-ing") {
+ if (actor eq failedActor) setActorFields(actor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
+
+ actor.postRestart(cause)
+ if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "restarted"))
+
+ dispatcher.resume(this)
+ }
}
}
@@ -737,29 +818,37 @@ private[akka] class ActorCell(
case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
- final def handleChildTerminated(child: ActorRef): Unit = try {
+ private def invokeHandleChildTerminated(supervisorOption: Option[Actor], child: ActorRef): Unit = {
+ for (supervisor ← supervisorOption) {
+ handlingCleanupFailure(supervisorOption, "child-terminated-notifying") {
+ supervisor.supervisorStrategy.handleChildTerminated(this, child, children)
+ }
+ }
+ }
+
+ final def handleChildTerminated(child: ActorRef): Unit = {
childrenRefs match {
case tc @ TerminatingChildrenContainer(_, _, reason) ⇒
val n = tc.remove(child)
childrenRefs = n
- actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
- case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
- case Termination ⇒ doTerminate()
- case _ ⇒
+ case Recreation(cause, failedActor) ⇒
+ invokeHandleChildTerminated(Some(failedActor), child)
+ finishRecreate(cause, failedActor)
+ case Termination(terminatedActorOption) ⇒
+ invokeHandleChildTerminated(terminatedActorOption, child)
+ // TODO should handlingCleanupFailure be moved inside
+ // finishTerminate as it is for finishRecreate?
+ handlingCleanupFailure(terminatedActorOption, "terminating") {
+ finishTerminate(terminatedActorOption)
+ }
+ case _ ⇒
+ invokeHandleChildTerminated(Option(actor), child)
}
case _ ⇒
childrenRefs = childrenRefs.remove(child)
- actor.supervisorStrategy.handleChildTerminated(this, child, children)
+ invokeHandleChildTerminated(Option(actor), child)
}
- } catch {
- case NonFatal(e) ⇒
- try {
- dispatcher suspend this
- actor.supervisorStrategy.handleSupervisorFailing(self, children)
- } finally {
- parent.tell(Failed(e), self)
- }
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment