Created
August 16, 2013 15:22
-
-
Save ryantanner/6250841 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// pass in the role name of the service being tested | |
object MultiSpecConfig extends MultiNodeConfig { | |
// register the named roles (nodes) of the test | |
val supervisor = role("supervisor") | |
val service1 = role("service1") | |
val service2 = role("service2") | |
val nodeConfig: Config = ConfigFactory.parseString(s""" | |
akka.actor.provider = "akka.cluster.ClusterActorRefProvider" | |
akka.remote.log-remote-lifecycle-events = off | |
akka.log-dead-letters = off | |
akka.log-dead-letters-during-shutdown = off | |
akka.cluster.log-info = off | |
akka.test.timefactor = 4 | |
akka.actor.debug.receive = off | |
akka.cluster.failure-detector.threshold = 8 | |
# don't use sigar for tests, native lib not in path | |
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector | |
""") | |
val supervisorConfig = ConfigFactory.parseString(s""" | |
akka.cluster.roles = [ supervisor ] | |
""") | |
val serviceConfig = ConfigFactory.parseString(s""" | |
akka.cluster.roles = [ analytics ] | |
akka.actor.deployment { | |
/analytics-leader-singleton/analytics-leader/nodeRouter { | |
router = round-robin | |
nr-of-instances = 100 | |
cluster { | |
enabled = on | |
max-nr-of-instances-per-node = 3 | |
allow-local-routees = on | |
use-role = analytics | |
} | |
} | |
} | |
""") | |
commonConfig(nodeConfig) | |
nodeConfig(supervisor)(supervisorConfig) | |
nodeConfig(service1)(serviceConfig) | |
nodeConfig(service2)(serviceConfig) | |
} | |
class AnalyticsSpecMultiJvm1 extends AnalyticsSpec | |
class AnalyticsSpecMultiJvm2 extends AnalyticsSpec | |
class AnalyticsSpecMultiJvm3 extends AnalyticsSpec | |
abstract class AnalyticsSpec extends MultiNodeSpec(MultiSpecConfig) | |
with WordSpec with MustMatchers with BeforeAndAfterAll | |
with ImplicitSender with BeforeAndAfterEach { | |
import MultiSpecConfig._ | |
val facade = TestActorRef(Props[Facade], "facade") | |
override def initialParticipants = roles.size | |
override def beforeAll() = multiNodeSpecBeforeAll() | |
override def afterAll() = multiNodeSpecAfterAll() | |
override def beforeEach() = { | |
} | |
override def afterEach() = { | |
} | |
//#abstract-test | |
"The analytics service" must { | |
//#startup-cluster | |
"illustrate how to startup cluster" in within(15.seconds.dilated) { | |
Cluster(system).subscribe(testActor, classOf[MemberUp]) | |
expectMsgClass(classOf[CurrentClusterState]) | |
// Analytics leader. One per cluster. | |
runOn(service1, service2) { | |
system.actorOf( | |
ClusterSingletonManager.props( | |
singletonProps = handOverData => Props[AnalyticsLeader], | |
singletonName = "analytics-leader", | |
terminationMessage = PoisonPill, | |
role = Some("analytics") | |
), | |
name = "analytics-leader-singleton" | |
) | |
} | |
Cluster(system).subscribe(facade, classOf[MemberUp]) | |
Cluster(system) join node(supervisor).address | |
val members = receiveN(3).collect { case MemberUp(m) => m }.toSet | |
val supervisorMember = members.find(_.address == node(supervisor).address).get | |
members.map(_.address) must be ( | |
Set(node(supervisor).address, node(service1).address, node(service2).address)) | |
Cluster(system).unsubscribe(testActor) | |
runOn(supervisor) { | |
val supervisorProbe = system.actorOf(Props(new Actor { | |
def receive = { | |
case x ⇒ testActor.tell(x, sender) | |
} | |
override def preStart = { | |
context.actorOf(Props(new Actor { | |
def receive = { | |
case x => testActor.tell(x, sender) | |
} | |
}), "supervisor-leader") | |
} | |
}), "supervisor-leader-singleton") | |
} | |
// Set supervisor in all facades to point to testActor | |
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge -= "supervisor" | |
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge += ("supervisor" -> SortedSet[Member](supervisorMember)) | |
//testConductor.enter("all-up") | |
enterBarrier("all-up") | |
} | |
//#startup-cluster | |
//#test-statsService | |
"show usage of the analytics leader from one node" in within(35.seconds.dilated) { | |
runOn(service1) { | |
assertServiceOk() | |
} | |
//testConductor.enter("done-2") | |
enterBarrier("done-2") | |
} | |
def assertServiceOk(): Unit = { | |
//val facade = system.actorSelection("facade") | |
// eventually the service should be ok, | |
// first attempts might fail because worker actors not started yet | |
awaitAssert { | |
//maybeService.foreach(service => service ! StartAnalytics(Nil, 42L)) | |
val sa = StartAnalytics(Nil, 42L) | |
log.info("Sending {}", sa.uuid) | |
system.actorSelection("/user/facade") ! sa | |
expectMsgType[AcknowledgeAnalytics](5.second).uuid must be(sa.uuid) | |
expectMsgType[CompletedAnalytics](5.second).uuid must be(sa.uuid) | |
} | |
} | |
//#test-statsService | |
"show usage of the analytics leader from all nodes" in within(15.seconds.dilated) { | |
assertServiceOk() | |
//testConductor.enter("done-3") | |
enterBarrier("done-3") | |
} | |
"move service leader if first node leaves" in within(45.seconds.dilated) { | |
/* | |
runOn(service1) { | |
enterBarrier("done-4") | |
} | |
*/ | |
runOn(service2) { | |
val oldLeader = facade.underlyingActor.asInstanceOf[Facade].currentLeader("analytics") | |
val address = node(service1).address | |
Cluster(system) down address | |
log.info("Removing {}", address) | |
within(10.seconds.dilated) { | |
expectNoMsg | |
} | |
assertServiceOk() | |
awaitAssert { | |
facade.underlyingActor.asInstanceOf[Facade].currentLeader("analytics") must not equal (oldLeader) | |
} | |
} | |
//testConductor.enter("done-4") | |
enterBarrier("done-4") | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment