Skip to content

Instantly share code, notes, and snippets.

@mbarton
Created June 20, 2013 14:37
Show Gist options
  • Save mbarton/5823277 to your computer and use it in GitHub Desktop.
Save mbarton/5823277 to your computer and use it in GitHub Desktop.
import akka.actor.RootActorPath
import akka.cluster.ClusterEvent._
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberUp
import scala.collection.immutable.SortedSet
import scala.collection.mutable.{Map => MutableMap}
import akka.actor._
import akka.cluster.{MemberStatus, Cluster, Member}
import scala.Some
// Inspired by the ClusterSingletonManager in Akka contrib
// This class is similar but does not do state handover and automatically forwards
// on messages to the singleton
object ClusterSingletons {
val namespace = "clusterSingletons"
}
class ClusterSingletons(system: ActorSystem) {
def register(name: String, props: Props): ActorRef = {
system.actorOf(Props(classOf[SingletonActor], name, props), name = ClusterSingletons.namespace)
}
}
class SingletonActor(name: String, props: Props) extends Actor with ActorLogging {
val cluster = Cluster(context.system)
val orderByOldest = Ordering.fromLessThan[Member] { (a,b) => a.isOlderThan(b) }
var members: SortedSet[Member] = SortedSet.empty(orderByOldest)
var singleton: Option[ActorSelection] = None
var singletonInstance: Option[ActorRef] = None
override def preStart() = {
cluster.subscribe(self, classOf[MemberEvent])
}
override def postStop() = cluster.unsubscribe(self)
def updateSingleton(newAddress: Address) = {
if(newAddress == cluster.selfAddress) {
log.info("New oldest member is myself!")
if(singletonInstance == None) {
log.info(s"Spawning singleton actor: $name")
singletonInstance = Some(context.actorOf(props, name = name))
}
} else {
log.info("New oldest is not myself. Killing singleton actor (if already running)")
singletonInstance.map { _ ! PoisonPill }
}
singleton = Some(context.actorSelection(RootActorPath(newAddress) / "user" / ClusterSingletons.namespace / name))
}
def trackChanges(updateFunc: () => Unit) = {
val before = members.headOption
updateFunc()
val after = members.headOption
if(before != after) {
after.map { member => {
val address = member.address
log.info(s"New oldest member: singleton now on $address")
updateSingleton(address)
}}
}
}
def receive = {
case state: CurrentClusterState => {
val newMembers = SortedSet.empty(orderByOldest) ++ state.members.collect {
case member if member.status == MemberStatus.Up => member
}
trackChanges { () => members = newMembers }
}
case MemberUp(member) => trackChanges { () => members += member }
case MemberExited(member) => trackChanges { () => members -= member }
case MemberRemoved(member, _) => trackChanges { () => members -= member }
case UnreachableMember(member) => trackChanges { () => members -= member }
case msg => singleton.map { _ ! msg }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment