Last active
December 18, 2015 22:09
-
-
Save codingnirvana/5852355 to your computer and use it in GitHub Desktop.
Akka Cluster Master/Member Impln.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object SampleClusterMember { | |
| def main(args: Array[String]): Unit = { | |
| val config = ConfigFactory.parseString(""" | |
| akka { | |
| actor { | |
| provider = "akka.cluster.ClusterActorRefProvider" | |
| } | |
| remote { | |
| transport = "akka.remote.netty.NettyRemoteTransport" | |
| log-remote-lifecycle-events = off | |
| netty.tcp { | |
| hostname = "127.0.0.1" | |
| port = 0 | |
| } | |
| } | |
| cluster { | |
| seed-nodes = [ | |
| "akka.tcp://ClusterSystem@127.0.0.1:2551"] | |
| auto-down = on | |
| } | |
| } | |
| """) | |
| val system = ActorSystem("ClusterSystem", ConfigFactory.load(config)) | |
| //Thread.sleep(1000) | |
| val clusterMember = system.actorOf(Props[ClusterMember]) | |
| } | |
| class ClusterMember extends Actor with ActorLogging { | |
| val cluster = Cluster(context.system) | |
| // sort by age, oldest first | |
| val ageOrdering = Ordering.fromLessThan[Member] { (a, b) => a.isOlderThan(b) } | |
| var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) | |
| override def preStart() = cluster.subscribe(self, classOf[ClusterDomainEvent]) | |
| override def postStop() = cluster.unsubscribe(self) | |
| def register() = { | |
| currentMaster foreach { _ ! "Register" } | |
| postRegistration() | |
| } | |
| def postRegistration() = { | |
| log.info(s"Current Master is $currentMaster") | |
| } | |
| def receive = { | |
| case state: CurrentClusterState => | |
| membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { | |
| case m if m.hasRole("master") => { register(); m } | |
| } | |
| case MemberUp(m) => | |
| if (m.hasRole("master")) { | |
| membersByAge += m; register() | |
| } | |
| case MemberRemoved(m, _) => | |
| if (m.hasRole("master")) | |
| membersByAge -= m | |
| case _: MemberEvent => // not interesting | |
| } | |
| def currentMaster: Option[ActorSelection] = { | |
| membersByAge.size match { | |
| case 0 => None | |
| case _ => Some(context.actorSelection(RootActorPath(membersByAge.head.address) / | |
| "user" / "master")) | |
| } | |
| } | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object SampleMaster { | |
| def main(args: Array[String]): Unit = { | |
| val config = ConfigFactory.parseString(""" | |
| akka { | |
| actor { | |
| provider = "akka.cluster.ClusterActorRefProvider" | |
| } | |
| remote { | |
| transport = "akka.remote.netty.NettyRemoteTransport" | |
| log-remote-lifecycle-events = off | |
| netty.tcp { | |
| hostname = "127.0.0.1" | |
| port = 2551 | |
| } | |
| } | |
| cluster { | |
| seed-nodes = [ | |
| "akka.tcp://ClusterSystem@127.0.0.1:2551"] | |
| roles = [master] | |
| auto-down = on | |
| } | |
| } | |
| """) | |
| val system = ActorSystem("ClusterSystem", ConfigFactory.load(config)) | |
| val master = system.actorOf(Props[ClusterMaster], "master") | |
| } | |
| class ClusterMaster extends Actor with ActorLogging { | |
| def receive = { | |
| case "Register" => | |
| log.info(s"Registration Request: $sender") | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment