Created
June 20, 2013 14:37
-
-
Save mbarton/5823277 to your computer and use it in GitHub Desktop.
Akka stateless cluster singleton Inspired by https://github.com/akka/akka/blob/master/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.RootActorPath | |
import akka.cluster.ClusterEvent._ | |
import akka.cluster.ClusterEvent.CurrentClusterState | |
import akka.cluster.ClusterEvent.MemberRemoved | |
import akka.cluster.ClusterEvent.MemberUp | |
import scala.collection.immutable.SortedSet | |
import scala.collection.mutable.{Map => MutableMap} | |
import akka.actor._ | |
import akka.cluster.{MemberStatus, Cluster, Member} | |
import scala.Some | |
// Inspired by the ClusterSingletonManager in Akka contrib | |
// This class is similar but does not do state handover and automatically forwards | |
// on messages to the singleton | |
object ClusterSingletons { | |
val namespace = "clusterSingletons" | |
} | |
class ClusterSingletons(system: ActorSystem) { | |
def register(name: String, props: Props): ActorRef = { | |
system.actorOf(Props(classOf[SingletonActor], name, props), name = ClusterSingletons.namespace) | |
} | |
} | |
class SingletonActor(name: String, props: Props) extends Actor with ActorLogging { | |
val cluster = Cluster(context.system) | |
val orderByOldest = Ordering.fromLessThan[Member] { (a,b) => a.isOlderThan(b) } | |
var members: SortedSet[Member] = SortedSet.empty(orderByOldest) | |
var singleton: Option[ActorSelection] = None | |
var singletonInstance: Option[ActorRef] = None | |
override def preStart() = { | |
cluster.subscribe(self, classOf[MemberEvent]) | |
} | |
override def postStop() = cluster.unsubscribe(self) | |
def updateSingleton(newAddress: Address) = { | |
if(newAddress == cluster.selfAddress) { | |
log.info("New oldest member is myself!") | |
if(singletonInstance == None) { | |
log.info(s"Spawning singleton actor: $name") | |
singletonInstance = Some(context.actorOf(props, name = name)) | |
} | |
} else { | |
log.info("New oldest is not myself. Killing singleton actor (if already running)") | |
singletonInstance.map { _ ! PoisonPill } | |
} | |
singleton = Some(context.actorSelection(RootActorPath(newAddress) / "user" / ClusterSingletons.namespace / name)) | |
} | |
def trackChanges(updateFunc: () => Unit) = { | |
val before = members.headOption | |
updateFunc() | |
val after = members.headOption | |
if(before != after) { | |
after.map { member => { | |
val address = member.address | |
log.info(s"New oldest member: singleton now on $address") | |
updateSingleton(address) | |
}} | |
} | |
} | |
def receive = { | |
case state: CurrentClusterState => { | |
val newMembers = SortedSet.empty(orderByOldest) ++ state.members.collect { | |
case member if member.status == MemberStatus.Up => member | |
} | |
trackChanges { () => members = newMembers } | |
} | |
case MemberUp(member) => trackChanges { () => members += member } | |
case MemberExited(member) => trackChanges { () => members -= member } | |
case MemberRemoved(member, _) => trackChanges { () => members -= member } | |
case UnreachableMember(member) => trackChanges { () => members -= member } | |
case msg => singleton.map { _ ! msg } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment