Skip to content

Instantly share code, notes, and snippets.

@ryantanner
Created August 16, 2013 15:22
Show Gist options
  • Save ryantanner/6250841 to your computer and use it in GitHub Desktop.
Save ryantanner/6250841 to your computer and use it in GitHub Desktop.
// pass in the role name of the service being tested
object MultiSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
val supervisor = role("supervisor")
val service1 = role("service1")
val service2 = role("service2")
val nodeConfig: Config = ConfigFactory.parseString(s"""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.log-info = off
akka.test.timefactor = 4
akka.actor.debug.receive = off
akka.cluster.failure-detector.threshold = 8
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
""")
val supervisorConfig = ConfigFactory.parseString(s"""
akka.cluster.roles = [ supervisor ]
""")
val serviceConfig = ConfigFactory.parseString(s"""
akka.cluster.roles = [ analytics ]
akka.actor.deployment {
/analytics-leader-singleton/analytics-leader/nodeRouter {
router = round-robin
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-role = analytics
}
}
}
""")
commonConfig(nodeConfig)
nodeConfig(supervisor)(supervisorConfig)
nodeConfig(service1)(serviceConfig)
nodeConfig(service2)(serviceConfig)
}
class AnalyticsSpecMultiJvm1 extends AnalyticsSpec
class AnalyticsSpecMultiJvm2 extends AnalyticsSpec
class AnalyticsSpecMultiJvm3 extends AnalyticsSpec
abstract class AnalyticsSpec extends MultiNodeSpec(MultiSpecConfig)
with WordSpec with MustMatchers with BeforeAndAfterAll
with ImplicitSender with BeforeAndAfterEach {
import MultiSpecConfig._
val facade = TestActorRef(Props[Facade], "facade")
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
override def beforeEach() = {
}
override def afterEach() = {
}
//#abstract-test
"The analytics service" must {
//#startup-cluster
"illustrate how to startup cluster" in within(15.seconds.dilated) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
// Analytics leader. One per cluster.
runOn(service1, service2) {
system.actorOf(
ClusterSingletonManager.props(
singletonProps = handOverData => Props[AnalyticsLeader],
singletonName = "analytics-leader",
terminationMessage = PoisonPill,
role = Some("analytics")
),
name = "analytics-leader-singleton"
)
}
Cluster(system).subscribe(facade, classOf[MemberUp])
Cluster(system) join node(supervisor).address
val members = receiveN(3).collect { case MemberUp(m) => m }.toSet
val supervisorMember = members.find(_.address == node(supervisor).address).get
members.map(_.address) must be (
Set(node(supervisor).address, node(service1).address, node(service2).address))
Cluster(system).unsubscribe(testActor)
runOn(supervisor) {
val supervisorProbe = system.actorOf(Props(new Actor {
def receive = {
case x ⇒ testActor.tell(x, sender)
}
override def preStart = {
context.actorOf(Props(new Actor {
def receive = {
case x => testActor.tell(x, sender)
}
}), "supervisor-leader")
}
}), "supervisor-leader-singleton")
}
// Set supervisor in all facades to point to testActor
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge -= "supervisor"
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge += ("supervisor" -> SortedSet[Member](supervisorMember))
//testConductor.enter("all-up")
enterBarrier("all-up")
}
//#startup-cluster
//#test-statsService
"show usage of the analytics leader from one node" in within(35.seconds.dilated) {
runOn(service1) {
assertServiceOk()
}
//testConductor.enter("done-2")
enterBarrier("done-2")
}
def assertServiceOk(): Unit = {
//val facade = system.actorSelection("facade")
// eventually the service should be ok,
// first attempts might fail because worker actors not started yet
awaitAssert {
//maybeService.foreach(service => service ! StartAnalytics(Nil, 42L))
val sa = StartAnalytics(Nil, 42L)
log.info("Sending {}", sa.uuid)
system.actorSelection("/user/facade") ! sa
expectMsgType[AcknowledgeAnalytics](5.second).uuid must be(sa.uuid)
expectMsgType[CompletedAnalytics](5.second).uuid must be(sa.uuid)
}
}
//#test-statsService
"show usage of the analytics leader from all nodes" in within(15.seconds.dilated) {
assertServiceOk()
//testConductor.enter("done-3")
enterBarrier("done-3")
}
"move service leader if first node leaves" in within(45.seconds.dilated) {
/*
runOn(service1) {
enterBarrier("done-4")
}
*/
runOn(service2) {
val oldLeader = facade.underlyingActor.asInstanceOf[Facade].currentLeader("analytics")
val address = node(service1).address
Cluster(system) down address
log.info("Removing {}", address)
within(10.seconds.dilated) {
expectNoMsg
}
assertServiceOk()
awaitAssert {
facade.underlyingActor.asInstanceOf[Facade].currentLeader("analytics") must not equal (oldLeader)
}
}
//testConductor.enter("done-4")
enterBarrier("done-4")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment