Created
January 27, 2013 19:18
-
-
Save patriknw/4649875 to your computer and use it in GitHub Desktop.
Port of ClusterSingletonManager to Akka 2.1.0. Use the 2.1.1 version when that is out.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.contrib.pattern | |
import scala.concurrent.duration._ | |
import akka.actor.Actor | |
import akka.actor.Actor.Receive | |
import akka.actor.ActorLogging | |
import akka.actor.ActorRef | |
import akka.actor.Address | |
import akka.actor.FSM | |
import akka.actor.Props | |
import akka.actor.Terminated | |
import akka.cluster.Cluster | |
import akka.cluster.ClusterEvent._ | |
import akka.cluster.MemberStatus | |
import akka.AkkaException | |
object ClusterSingletonManager { | |
/** | |
* Internal API | |
* public due to the `with FSM` type parameters | |
*/ | |
sealed trait State | |
/** | |
* Internal API | |
* public due to the `with FSM` type parameters | |
*/ | |
sealed trait Data | |
/** | |
* Internal API | |
*/ | |
private object Internal { | |
/** | |
* Sent from new leader to previous leader to initate the | |
* hand over process. `HandOverInProgress` and `HandOverDone` | |
* are expected replies. | |
*/ | |
case object HandOverToMe | |
/** | |
* Confirmation by the previous leader that the hand | |
* over process, shutdown of the singleton actor, has | |
* started. | |
*/ | |
case object HandOverInProgress | |
/** | |
* Confirmation by the previous leader that the singleton | |
* actor has been terminated and the hand over process is | |
* completed. The `handOverData` holds the message, if any, | |
* sent from the singleton actor to its parent ClusterSingletonManager | |
* when shutting down. It is passed to the `singletonProps` | |
* factory on the new leader node. | |
*/ | |
case class HandOverDone(handOverData: Option[Any]) | |
/** | |
* Sent from from previous leader to new leader to | |
* initiate the normal hand over process. | |
* Especially useful when new node joins and becomes | |
* leader immediately, without knowing who was previous | |
* leader. | |
*/ | |
case object TakeOverFromMe | |
case class HandOverRetry(count: Int) | |
case class TakeOverRetry(leaderPeer: ActorRef, count: Int) | |
case object Cleanup | |
case object StartLeaderChangedBuffer | |
case object PreStart extends State | |
case object Start extends State | |
case object Leader extends State | |
case object NonLeader extends State | |
case object BecomingLeader extends State | |
case object WasLeader extends State | |
case object HandingOver extends State | |
case object TakeOver extends State | |
case object Uninitialized extends Data | |
case class NonLeaderData(leaderOption: Option[Address]) extends Data | |
case class BecomingLeaderData(previousLeaderOption: Option[Address]) extends Data | |
case class LeaderData(singleton: ActorRef, singletonTerminated: Boolean = false, | |
handOverData: Option[Any] = None) extends Data | |
case class WasLeaderData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], | |
newLeader: Address) extends Data | |
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data | |
val HandOverRetryTimer = "hand-over-retry" | |
val TakeOverRetryTimer = "take-over-retry" | |
val CleanupTimer = "cleanup" | |
object LeaderChangedBuffer { | |
/** | |
* Request to deliver one more event. | |
*/ | |
case object GetNext | |
/** | |
* The first event, corresponding to CurrentClusterState. | |
*/ | |
case class InitialLeaderState(leader: Option[Address], memberCount: Int) | |
} | |
/** | |
* Notifications of [[akka.cluster.ClusterEvent.LeaderChanged]] is tunneled | |
* via this actor (child of ClusterSingletonManager) to be able to deliver | |
* one change at a time. Avoiding simultaneous leader changes simplifies | |
* the process in ClusterSingletonManager. ClusterSingletonManager requests | |
* next event with `GetNext` when it is ready for it. Only one outstanding | |
* `GetNext` request is allowed. Incoming events are buffered and delivered | |
* upon `GetNext` request. | |
*/ | |
class LeaderChangedBuffer extends Actor { | |
import LeaderChangedBuffer._ | |
import context.dispatcher | |
val cluster = Cluster(context.system) | |
var changes = Vector.empty[AnyRef] | |
var memberCount = 0 | |
// subscribe to LeaderChanged, re-subscribe when restart | |
override def preStart(): Unit = cluster.subscribe(self, classOf[LeaderChanged]) | |
override def postStop(): Unit = cluster.unsubscribe(self) | |
def receive = { | |
case state: CurrentClusterState ⇒ | |
changes :+= InitialLeaderState(state.leader, state.members.size) | |
case event: LeaderChanged ⇒ | |
changes :+= event | |
case GetNext if changes.isEmpty ⇒ | |
context.become(deliverNext, discardOld = false) | |
case GetNext ⇒ | |
val event = changes.head | |
changes = changes.tail | |
context.parent ! event | |
} | |
// the buffer was empty when GetNext was received, deliver next event immediately | |
def deliverNext: Actor.Receive = { | |
case state: CurrentClusterState ⇒ | |
context.parent ! InitialLeaderState(state.leader, state.members.size) | |
context.unbecome() | |
case event: LeaderChanged ⇒ | |
context.parent ! event | |
context.unbecome() | |
} | |
} | |
} | |
} | |
/** | |
* Java API. Factory for the [[akka.actor.Props]] of the singleton | |
* actor instance. Used in constructor of | |
* [[akka.contrib.pattern.ClusterSingletonManager]] | |
*/ | |
@SerialVersionUID(1L) | |
trait ClusterSingletonPropsFactory extends Serializable { | |
/** | |
* Create the `Props` from the `handOverData` sent from | |
* previous singleton. `handOverData` might be null | |
* when no hand over took place, or when the there is no need | |
* for sending data to the new singleton. | |
*/ | |
def create(handOverData: Any): Props | |
} | |
/** | |
* Thrown when a consistent state can't be determined within the | |
* defined retry limits. Eventually it will reach a stable state and | |
* can continue, and that is simplified by starting over with a clean | |
* state. Parent supervisor should typically restart the actor, i.e. | |
* default decision. | |
*/ | |
class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(message, null) | |
/** | |
* Manages a cluster wide singleton actor instance, i.e. | |
* at most one singleton instance is running at any point in time. | |
* The ClusterSingletonManager is supposed to be started on all | |
* nodes in the cluster with `actorOf`. The actual singleton is | |
* started on the leader node of the cluster by creating a child | |
* actor from the supplied `singletonProps`. | |
* | |
* The singleton actor is always running on the leader member, which is | |
* nothing more than the address currently sorted first in the member | |
* ring. This can change when adding or removing members. A graceful hand | |
* over can normally be performed when joining a new node that becomes | |
* leader or removing current leader node. Be aware that there is a | |
* short time period when there is no active singleton during the | |
* hand over process. | |
* | |
* The singleton actor can at any time send a message to its parent | |
* ClusterSingletonManager and this message will be passed to the | |
* `singletonProps` factory on the new leader node when a graceful | |
* hand over is performed. | |
* | |
* The cluster failure detector will notice when a leader node | |
* becomes unreachable due to things like JVM crash, hard shutdown, | |
* or network failure. Then a new leader node will take over and a | |
* new singleton actor is created. For these failure scenarios there | |
* will not be a graceful hand over, but more than one active singletons | |
* is prevented by all reasonable means. Some corner cases are eventually | |
* resolved by configurable timeouts. | |
* | |
* You access the singleton actor with `actorFor` using the names you have | |
* specified when creating the ClusterSingletonManager. You can subscribe to | |
* [[akka.cluster.ClusterEvent.LeaderChanged]] to keep track of which node | |
* it is supposed to be running on. Alternatively the singleton actor may | |
* broadcast its existence when it is started. | |
* | |
* ==Arguments== | |
* | |
* '''''singletonProps''''' Factory for [[akka.actor.Props]] of the | |
* singleton actor instance. The `Option` parameter is the the | |
* `handOverData` sent from previous singleton. `handOverData` | |
* might be None when no hand over took place, or when the there | |
* is no need for sending data to the new singleton. The `handOverData` | |
* is typically passed as parameter to the constructor of the | |
* singleton actor. | |
* | |
* '''''singletonName''''' The actor name of the child singleton actor. | |
* | |
* '''''terminationMessage''''' When handing over to a new leader node | |
* this `terminationMessage` is sent to the singleton actor to tell | |
* it to finish its work, close resources, and stop. It can sending | |
* a message back to the parent ClusterSingletonManager, which will | |
* passed to the `singletonProps` factory on the new leader node. | |
* The hand over to the new leader node is completed when the | |
* singleton actor is terminated. | |
* Note that [[akka.actor.PoisonPill]] is a perfectly fine | |
* `terminationMessage` if you only need to stop the actor. | |
* | |
* '''''maxHandOverRetries''''' When a node is becoming leader it sends | |
* hand over request to previous leader. This is retried with the | |
* `retryInterval` until the previous leader confirms that the hand | |
* over has started, or this `maxHandOverRetries` limit has been | |
* reached. If the retry limit is reached it takes the decision to be | |
* the new leader if previous leader is unknown (typically removed or | |
* downed), otherwise it initiates a new round by throwing | |
* [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting | |
* restart with fresh state. For a cluster with many members you might | |
* need to increase this retry limit because it takes longer time to | |
* propagate changes across all nodes. | |
* | |
* '''''maxTakeOverRetries''''' When a leader node is not leader any more | |
* it sends take over request to the new leader to initiate the normal | |
* hand over process. This is especially useful when new node joins and becomes | |
* leader immediately, without knowing who was previous leader. This is retried | |
* with the `retryInterval` until this retry limit has been reached. If the retry | |
* limit is reached it initiates a new round by throwing | |
* [[akka.contrib.pattern.ClusterSingletonManagerIsStuck]] and expecting | |
* restart with fresh state. This will also cause the singleton actor to be | |
* stopped. `maxTakeOverRetries` must be less than `maxHandOverRetries` to | |
* ensure that new leader doesn't start singleton actor before previous is | |
* stopped for certain corner cases. | |
* | |
* '''''loggingEnabled''''' Logging of what is going on at info log level. | |
*/ | |
class ClusterSingletonManager( | |
singletonProps: Option[Any] ⇒ Props, | |
singletonName: String, | |
terminationMessage: Any, | |
maxHandOverRetries: Int = 20, | |
maxTakeOverRetries: Int = 15, | |
retryInterval: FiniteDuration = 1.second, | |
loggingEnabled: Boolean = true) | |
extends Actor with FSM[ClusterSingletonManager.State, ClusterSingletonManager.Data] { | |
// to ensure that new leader doesn't start singleton actor before previous is stopped for certain corner cases | |
require(maxTakeOverRetries < maxHandOverRetries, | |
s"maxTakeOverRetries [${maxTakeOverRetries}]must be < maxHandOverRetries [${maxHandOverRetries}]") | |
/** | |
* Full Java API constructor. | |
*/ | |
def this( | |
singletonName: String, | |
terminationMessage: Any, | |
maxHandOverRetries: Int, | |
maxTakeOverRetries: Int, | |
retryInterval: FiniteDuration, | |
loggingEnabled: Boolean, | |
singletonPropsFactory: ClusterSingletonPropsFactory) = | |
this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage, | |
maxHandOverRetries, maxTakeOverRetries, retryInterval) | |
/** | |
* Java API constructor with default values. | |
*/ | |
def this( | |
singletonName: String, | |
terminationMessage: Any, | |
singletonPropsFactory: ClusterSingletonPropsFactory) = | |
this(handOverData ⇒ singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage) | |
import ClusterSingletonManager._ | |
import ClusterSingletonManager.Internal._ | |
import ClusterSingletonManager.Internal.LeaderChangedBuffer._ | |
val cluster = Cluster(context.system) | |
val selfAddressOption = Some(cluster.selfAddress) | |
// started when when self member is Up | |
var leaderChangedBuffer: ActorRef = _ | |
// Previous GetNext request delivered event and new GetNext is to be sent | |
var leaderChangedReceived = true | |
// keep track of previously downed members | |
var downed = Map.empty[Address, Deadline] | |
// keep track of previously removed members | |
var removed = Map.empty[Address, Deadline] | |
def addDowned(address: Address): Unit = | |
downed += address -> (Deadline.now + 15.minutes) | |
def addRemoved(address: Address): Unit = | |
removed += address -> (Deadline.now + 15.minutes) | |
def cleanupOverdueNotMemberAnyMore(): Unit = { | |
downed = downed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } | |
removed = removed filter { case (address, deadline) ⇒ deadline.hasTimeLeft } | |
} | |
def logInfo(message: String): Unit = | |
if (loggingEnabled) log.info(message) | |
def logInfo(template: String, arg1: Any): Unit = | |
if (loggingEnabled) log.info(template, arg1) | |
def logInfo(template: String, arg1: Any, arg2: Any): Unit = | |
if (loggingEnabled) log.info(template, arg1, arg2) | |
override def preStart(): Unit = { | |
super.preStart() | |
require(cluster.isRunning, "Cluster node must not be terminated") | |
// subscribe to cluster changes, re-subscribe when restart | |
cluster.subscribe(self, classOf[MemberUp]) | |
cluster.subscribe(self, classOf[MemberDowned]) | |
cluster.subscribe(self, classOf[MemberRemoved]) | |
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) | |
} | |
override def postStop(): Unit = { | |
cancelTimer(CleanupTimer) | |
cluster.unsubscribe(self) | |
super.postStop() | |
} | |
def peer(at: Address): ActorRef = context.actorFor(self.path.toStringWithAddress(at)) | |
def getNextLeaderChanged(): Unit = | |
if (leaderChangedReceived) { | |
leaderChangedReceived = false | |
leaderChangedBuffer ! GetNext | |
} | |
startWith(PreStart, Uninitialized) | |
when(PreStart) { | |
case Event(state: CurrentClusterState, _) if state.members.exists(m ⇒ m.address == cluster.selfAddress && m.status == MemberStatus.Up) ⇒ | |
self ! StartLeaderChangedBuffer | |
goto(Start) | |
case Event(MemberUp(m), _) if m.address == cluster.selfAddress ⇒ | |
self ! StartLeaderChangedBuffer | |
goto(Start) | |
} | |
when(Start) { | |
case Event(StartLeaderChangedBuffer, _) ⇒ | |
leaderChangedBuffer = context.actorOf(Props[LeaderChangedBuffer].withDispatcher(context.props.dispatcher)) | |
getNextLeaderChanged() | |
stay | |
case Event(InitialLeaderState(leaderOption, memberCount), _) ⇒ | |
leaderChangedReceived = true | |
if (leaderOption == selfAddressOption && memberCount == 1) | |
// alone, leader immediately | |
gotoLeader(None) | |
else if (leaderOption == selfAddressOption) | |
goto(BecomingLeader) using BecomingLeaderData(None) | |
else | |
goto(NonLeader) using NonLeaderData(leaderOption) | |
} | |
when(NonLeader) { | |
case Event(LeaderChanged(leaderOption), NonLeaderData(previousLeaderOption)) ⇒ | |
leaderChangedReceived = true | |
if (leaderOption == selfAddressOption) { | |
logInfo("NonLeader observed LeaderChanged: [{} -> myself]", previousLeaderOption) | |
previousLeaderOption match { | |
case None ⇒ gotoLeader(None) | |
case Some(prev) if downed.contains(prev) ⇒ gotoLeader(None) | |
case Some(prev) ⇒ | |
peer(prev) ! HandOverToMe | |
goto(BecomingLeader) using BecomingLeaderData(previousLeaderOption) | |
} | |
} else { | |
logInfo("NonLeader observed LeaderChanged: [{} -> {}]", previousLeaderOption, leaderOption) | |
getNextLeaderChanged() | |
stay using NonLeaderData(leaderOption) | |
} | |
case Event(MemberDowned(m), NonLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ | |
logInfo("Previous leader downed [{}]", m.address) | |
addDowned(m.address) | |
// transition when LeaderChanged | |
stay using NonLeaderData(None) | |
case Event(MemberRemoved(m), _) if m.address == cluster.selfAddress ⇒ | |
logInfo("Self removed, stopping ClusterSingletonManager") | |
stop() | |
} | |
when(BecomingLeader) { | |
case Event(HandOverInProgress, _) ⇒ | |
// confirmation that the hand over process has started | |
logInfo("Hand over in progress at [{}]", sender.path.address) | |
cancelTimer(HandOverRetryTimer) | |
stay | |
case Event(HandOverDone(handOverData), BecomingLeaderData(Some(previousLeader))) ⇒ | |
if (sender.path.address == previousLeader) | |
gotoLeader(handOverData) | |
else { | |
logInfo("Ignoring HandOverDone in BecomingLeader from [{}]. Expected previous leader [{}]", | |
sender.path.address, previousLeader) | |
stay | |
} | |
case Event(MemberDowned(m), BecomingLeaderData(Some(previousLeader))) if m.address == previousLeader ⇒ | |
logInfo("Previous leader [{}] downed", previousLeader) | |
addDowned(m.address) | |
gotoLeader(None) | |
case Event(TakeOverFromMe, BecomingLeaderData(None)) ⇒ | |
sender ! HandOverToMe | |
stay using BecomingLeaderData(Some(sender.path.address)) | |
case Event(TakeOverFromMe, BecomingLeaderData(Some(previousLeader))) ⇒ | |
if (previousLeader == sender.path.address) sender ! HandOverToMe | |
else logInfo("Ignoring TakeOver request in BecomingLeader from [{}]. Expected previous leader [{}]", | |
sender.path.address, previousLeader) | |
stay | |
case Event(HandOverRetry(count), BecomingLeaderData(previousLeaderOption)) ⇒ | |
if (count <= maxHandOverRetries) { | |
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousLeaderOption) | |
previousLeaderOption foreach { peer(_) ! HandOverToMe } | |
setTimer(HandOverRetryTimer, HandOverRetry(count + 1), retryInterval, repeat = false) | |
} else if (previousLeaderOption.isEmpty) { | |
// can't send HandOverToMe, previousLeader unknown for new node (or restart) | |
// previous leader might be down or removed, so no TakeOverFromMe message is received | |
logInfo("Timeout in BecomingLeader. Previous leader unknown and no TakeOver request.") | |
gotoLeader(None) | |
} else | |
throw new ClusterSingletonManagerIsStuck( | |
s"Becoming singleton leader was stuck because previous leader [${previousLeaderOption}] is unresponsive") | |
} | |
def gotoLeader(handOverData: Option[Any]): State = { | |
logInfo("Singleton manager [{}] starting singleton actor", cluster.selfAddress) | |
val singleton = context watch context.actorOf(singletonProps(handOverData), singletonName) | |
goto(Leader) using LeaderData(singleton) | |
} | |
when(Leader) { | |
case Event(LeaderChanged(leaderOption), LeaderData(singleton, singletonTerminated, handOverData)) ⇒ | |
leaderChangedReceived = true | |
logInfo("Leader observed LeaderChanged: [{} -> {}]", cluster.selfAddress, leaderOption) | |
leaderOption match { | |
case Some(a) if a == cluster.selfAddress ⇒ | |
// already leader | |
stay | |
case Some(a) if downed.contains(a) || removed.contains(a) ⇒ | |
gotoHandingOver(singleton, singletonTerminated, handOverData, None) | |
case Some(a) ⇒ | |
// send TakeOver request in case the new leader doesn't know previous leader | |
val leaderPeer = peer(a) | |
leaderPeer ! TakeOverFromMe | |
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, 1), retryInterval, repeat = false) | |
goto(WasLeader) using WasLeaderData(singleton, singletonTerminated, handOverData, newLeader = a) | |
case _ ⇒ | |
// new leader will initiate the hand over | |
stay | |
} | |
case Event(HandOverToMe, LeaderData(singleton, singletonTerminated, handOverData)) ⇒ | |
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) | |
case Event(singletonHandOverMessage, d @ LeaderData(singleton, _, _)) if sender == singleton ⇒ | |
stay using d.copy(handOverData = Some(singletonHandOverMessage)) | |
case Event(Terminated(ref), d @ LeaderData(singleton, _, _)) if ref == singleton ⇒ | |
stay using d.copy(singletonTerminated = true) | |
} | |
when(WasLeader) { | |
case Event(TakeOverRetry(leaderPeer, count), _) ⇒ | |
val newLeader = leaderPeer.path.address | |
if (count <= maxTakeOverRetries) { | |
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newLeader) | |
leaderPeer ! TakeOverFromMe | |
setTimer(TakeOverRetryTimer, TakeOverRetry(leaderPeer, count + 1), retryInterval, repeat = false) | |
stay | |
} else | |
throw new ClusterSingletonManagerIsStuck(s"Expected hand over to [${newLeader}] never occured") | |
case Event(HandOverToMe, WasLeaderData(singleton, singletonTerminated, handOverData, _)) ⇒ | |
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender)) | |
case Event(MemberDowned(m), WasLeaderData(singleton, singletonTerminated, handOverData, newLeader)) if m.address == newLeader ⇒ | |
addDowned(m.address) | |
gotoHandingOver(singleton, singletonTerminated, handOverData, None) | |
case Event(singletonHandOverMessage, d @ WasLeaderData(singleton, _, _, _)) if sender == singleton ⇒ | |
stay using d.copy(handOverData = Some(singletonHandOverMessage)) | |
case Event(Terminated(ref), d @ WasLeaderData(singleton, _, _, _)) if ref == singleton ⇒ | |
stay using d.copy(singletonTerminated = true) | |
} | |
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], handOverTo: Option[ActorRef]): State = { | |
if (singletonTerminated) { | |
handOverDone(handOverTo, handOverData) | |
} else { | |
handOverTo foreach { _ ! HandOverInProgress } | |
singleton ! terminationMessage | |
goto(HandingOver) using HandingOverData(singleton, handOverTo, handOverData) | |
} | |
} | |
when(HandingOver) { | |
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo, handOverData))) if ref == singleton ⇒ | |
handOverDone(handOverTo, handOverData) | |
case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo, _)) if handOverTo == Some(sender) ⇒ | |
// retry | |
sender ! HandOverInProgress | |
stay | |
case Event(singletonHandOverMessage, d @ HandingOverData(singleton, _, _)) if sender == singleton ⇒ | |
stay using d.copy(handOverData = Some(singletonHandOverMessage)) | |
} | |
def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = { | |
val newLeader = handOverTo.map(_.path.address) | |
logInfo("Singleton terminated, hand over done [{} -> {}]", cluster.selfAddress, newLeader) | |
handOverTo foreach { _ ! HandOverDone(handOverData) } | |
goto(NonLeader) using NonLeaderData(newLeader) | |
} | |
whenUnhandled { | |
case Event(_: CurrentClusterState, _) ⇒ stay | |
case Event(MemberRemoved(m), _) ⇒ | |
logInfo("Member removed [{}]", m.address) | |
// if self removed, it will be stopped onTranstion to NonLeader | |
addRemoved(m.address) | |
stay | |
case Event(MemberDowned(m), _) ⇒ | |
logInfo("Member downed [{}]", m.address) | |
addDowned(m.address) | |
stay | |
case Event(TakeOverFromMe, _) ⇒ | |
logInfo("Ignoring TakeOver request in [{}] from [{}].", stateName, sender.path.address) | |
stay | |
case Event(Cleanup, _) ⇒ | |
cleanupOverdueNotMemberAnyMore() | |
stay | |
case Event(MemberUp(_), _) ⇒ | |
stay | |
} | |
onTransition { | |
case from -> to ⇒ logInfo("ClusterSingletonManager state change [{} -> {}]", from, to) | |
} | |
onTransition { | |
case _ -> BecomingLeader ⇒ setTimer(HandOverRetryTimer, HandOverRetry(1), retryInterval, repeat = false) | |
} | |
onTransition { | |
case BecomingLeader -> _ ⇒ cancelTimer(HandOverRetryTimer) | |
case WasLeader -> _ ⇒ cancelTimer(TakeOverRetryTimer) | |
} | |
onTransition { | |
case _ -> (NonLeader | Leader) ⇒ getNextLeaderChanged() | |
} | |
onTransition { | |
case _ -> NonLeader if removed.contains(cluster.selfAddress) || downed.contains(cluster.selfAddress) ⇒ | |
logInfo("Self removed, stopping ClusterSingletonManager") | |
stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment