Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save pisfly/53d207b98eb5dcd3dfa7628d144678ed to your computer and use it in GitHub Desktop.
Save pisfly/53d207b98eb5dcd3dfa7628d144678ed to your computer and use it in GitHub Desktop.
Example of how to implement a simple cluster wide actor registry.
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
object ClusterRegistrySpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-join = off
akka.cluster.auto-down = on
"""))
object TestChatUser {
case class Talk(to: String, msg: Any)
}
class TestChatUser(testActor: ActorRef) extends Actor {
import TestChatUser._
import ClusterRegistry._
def receive = {
case Talk(to, msg) ⇒ context.parent ! Send(to, msg)
case msg ⇒ testActor ! msg
}
}
object ClusterRegistry {
case class CreateActor(props: Props, name: String)
case class Send(to: String, msg: Any)
case object GetCount
}
class ClusterRegistry extends Actor {
import ClusterRegistry._
val cluster = Cluster(context.system)
var registry: Map[String, ActorRef] = Map.empty
var nodes: Set[Address] = Set.empty
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[UnreachableMember])
}
override def postStop: Unit = {
cluster unsubscribe self
}
def receive = {
case Send(to, msg) ⇒
registry.get(to) foreach { _ forward msg }
case GetCount ⇒
sender ! registry.size
case CreateActor(props, name) ⇒
val a = context.watch(context.actorOf(props, name))
registry += (name -> a)
sender ! a
nodes foreach { replicate(a, _) }
case ref: ActorRef ⇒
context.watch(ref)
registry += (ref.path.name -> ref)
case state: CurrentClusterState ⇒
nodes = state.members.map(_.address)
state.members.foreach(m ⇒ if (m.status == MemberStatus.Up) replicateAll(m.address))
case MemberUp(m) ⇒
nodes += m.address
replicateAll(m.address)
case memberEvent: MemberEvent ⇒
nodes -= memberEvent.member.address
case UnreachableMember(m) ⇒
nodes -= m.address
case Terminated(a) ⇒
registry -= a.path.name
}
/**
* Send my children to the registry at the other node
*/
def replicateAll(address: Address): Unit = {
if (address != cluster.selfAddress) {
val other = otherRegistry(address)
context.children foreach { other ! _ }
}
}
def replicate(ref: ActorRef, address: Address): Unit = {
if (address != cluster.selfAddress)
otherRegistry(address) ! ref
}
def otherRegistry(address: Address) = context.actorFor(self.path.toStringWithAddress(address))
}
}
class ClusterRegistryMultiJvmNode1 extends ClusterRegistrySpec
class ClusterRegistryMultiJvmNode2 extends ClusterRegistrySpec
class ClusterRegistryMultiJvmNode3 extends ClusterRegistrySpec
class ClusterRegistrySpec extends MultiNodeSpec(ClusterRegistrySpec) with STMultiNodeSpec with ImplicitSender {
import ClusterRegistrySpec._
import ClusterRegistrySpec.ClusterRegistry._
import ClusterRegistrySpec.TestChatUser._
override def initialParticipants = roles.size
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createRegisty()
}
enterBarrier(from.name + "-joined")
}
def createRegisty(): ActorRef =
system.actorOf(Props[ClusterRegistry], name = "registry")
def registry: ActorRef = system.actorFor("user/registry")
"A ClusterRegistry" must {
"startup 2 node cluster" in {
join(first, first)
join(second, first)
enterBarrier("after-1")
}
"keep track of added users" in within(15 seconds) {
val r = registry
runOn(first) {
r ! CreateActor(Props(new TestChatUser(testActor)), "u1")
val u1 = expectMsgType[ActorRef]
r ! CreateActor(Props(new TestChatUser(testActor)), "u2")
val u2 = expectMsgType[ActorRef]
// talk to user at same node
u1 ! Talk("u2", "hello")
expectMsg("hello")
lastSender must be(u2)
}
runOn(second) {
r ! CreateActor(Props(new TestChatUser(testActor)), "u3")
expectMsgType[ActorRef]
}
runOn(first, second) {
awaitCond {
r ! GetCount
expectMsgType[Int] == 3
}
}
enterBarrier("3-registered")
runOn(second) {
r ! CreateActor(Props(new TestChatUser(testActor)), "u4")
expectMsgType[ActorRef]
}
runOn(first, second) {
awaitCond {
r ! GetCount
expectMsgType[Int] == 4
}
}
enterBarrier("4-registered")
runOn(first) {
// talk to user on another node
system.actorFor("/user/registry/u1") ! Talk("u4", "hi there")
}
runOn(second) {
expectMsg("hi there")
lastSender.path.name must be("u4")
}
enterBarrier("after-2")
}
"replicate users to new node" in within(20 seconds) {
join(third, first)
val r = registry
runOn(third) {
r ! CreateActor(Props(new TestChatUser(testActor)), "u5")
expectMsgType[ActorRef]
}
awaitCond {
r ! GetCount
expectMsgType[Int] == 5
}
enterBarrier("5-registered")
runOn(third) {
system.actorFor("/user/registry/u5") ! Talk("u4", "go")
}
runOn(second) {
expectMsg("go")
lastSender.path.name must be("u4")
}
enterBarrier("after-3")
}
"remove terminated users" in within(5 seconds) {
val r = registry
runOn(second) {
system.actorFor("/user/registry/u3") ! PoisonPill
}
awaitCond {
r ! GetCount
expectMsgType[Int] == 5
}
enterBarrier("after-4")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment