Skip to content

Instantly share code, notes, and snippets.

@codingnirvana
Last active December 18, 2015 22:09
Show Gist options
  • Select an option

  • Save codingnirvana/5852355 to your computer and use it in GitHub Desktop.

Select an option

Save codingnirvana/5852355 to your computer and use it in GitHub Desktop.
Akka Cluster Master/Member Impln.
object SampleClusterMember {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"]
auto-down = on
}
}
""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
//Thread.sleep(1000)
val clusterMember = system.actorOf(Props[ClusterMember])
}
class ClusterMember extends Actor with ActorLogging {
val cluster = Cluster(context.system)
// sort by age, oldest first
val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) }
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)
override def preStart() = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop() = cluster.unsubscribe(self)
def register() = {
currentMaster foreach { _ ! "Register" }
postRegistration()
}
def postRegistration() = {
log.info(s"Current Master is $currentMaster")
}
def receive = {
case state: CurrentClusterState =>
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect {
case m if m.hasRole("master") => { register(); m }
}
case MemberUp(m) =>
if (m.hasRole("master")) {
membersByAge += m; register()
}
case MemberRemoved(m, _) =>
if (m.hasRole("master"))
membersByAge -= m
case _: MemberEvent => // not interesting
}
def currentMaster: Option[ActorSelection] = {
membersByAge.size match {
case 0 => None
case _ => Some(context.actorSelection(RootActorPath(membersByAge.head.address) /
"user" / "master"))
}
}
}
}
object SampleMaster {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551"]
roles = [master]
auto-down = on
}
}
""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val master = system.actorOf(Props[ClusterMaster], "master")
}
class ClusterMaster extends Actor with ActorLogging {
def receive = {
case "Register" =>
log.info(s"Registration Request: $sender")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment