Last active
December 21, 2015 04:09
-
-
Save ryantanner/6247240 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// pass in the role name of the service being tested | |
object BackendConfig extends MultiNodeConfig { | |
// register the named roles (nodes) of the test | |
val supervisor = role("supervisor") | |
val analytics = role("analytics") | |
val imap = role("imap") | |
val mailer = role("mailer") | |
val nodeConfig: Config = ConfigFactory.parseString(s""" | |
akka.actor.provider = "akka.cluster.ClusterActorRefProvider" | |
akka.remote.log-remote-lifecycle-events = off | |
akka.log-dead-letters = off | |
akka.log-dead-letters-during-shutdown = off | |
akka.cluster.log-info = off | |
akka.actor.debug.receive = off | |
akka.test.timefactor = 4 | |
akka.cluster.failure-detector.threshold = 8 | |
# don't use sigar for tests, native lib not in path | |
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector | |
akka.actor.deployment { | |
/analytics-leader-singleton/analytics-leader/nodeRouter { | |
router = round-robin | |
nr-of-instances = 100 | |
cluster { | |
enabled = on | |
max-nr-of-instances-per-node = 3 | |
allow-local-routees = on | |
use-role = analytics | |
} | |
} | |
/mailer-leader-singleton/mailer-leader/nodeRouter { | |
router = round-robin | |
nr-of-instances = 100 | |
cluster { | |
enabled = on | |
max-nr-of-instances-per-node = 3 | |
allow-local-routees = on | |
use-role = mailer | |
} | |
} | |
/imap-leader-singleton/imap-leader/nodeRouter { | |
router = round-robin | |
nr-of-instances = 100 | |
cluster { | |
enabled = on | |
max-nr-of-instances-per-node = 3 | |
allow-local-routees = on | |
use-role = imap | |
} | |
} | |
/imap-leader-singleton/imap-leader/nodeRouter/"*"/imap-message_reader_router { | |
router = round-robin | |
nr-of-instances = 5 | |
} | |
/imap-leader-singleton/imap-leader/nodeRouter/"*"/imap-message_writer_router { | |
router = round-robin | |
nr-of-instances = 5 | |
} | |
/mailer-leader-singleton/mailer-leader/nodeRouter/"*"/statsReaderRouter { | |
router = round-robin | |
nr-of-instances = 5 | |
} | |
/mailer-leader-singleton/mailer-leader/nodeRouter/"*"/senderRouter { | |
router = round-robin | |
nr-of-instances = 5 | |
} | |
/mailer-leader-singleton/mailer-leader/nodeRouter/"*"/rendererRouter { | |
router = round-robin | |
nr-of-instances = 5 | |
} | |
} | |
""") | |
val supervisorConfig = ConfigFactory.parseString(""" | |
akka.cluster.roles = [ supervisor ] | |
""") | |
val analyticsConfig = ConfigFactory.parseString(""" | |
akka.cluster.roles = [ analytics ] | |
""") | |
val mailerConfig = ConfigFactory.parseString(""" | |
akka.cluster.roles = [ mailer ] | |
""") | |
val imapConfig = ConfigFactory.parseString(""" | |
akka.cluster.roles = [ imap ] | |
""") | |
//commonConfig(nodeConfig) | |
nodeConfig(supervisor)(supervisorConfig.withFallback(nodeConfig)) | |
nodeConfig(analytics)(analyticsConfig.withFallback(nodeConfig)) | |
nodeConfig(mailer)(mailerConfig.withFallback(nodeConfig)) | |
nodeConfig(imap)(imapConfig.withFallback(nodeConfig)) | |
} | |
class EndToEndSpecMultiJvm1 extends EndToEndSpec | |
class EndToEndSpecMultiJvm2 extends EndToEndSpec | |
class EndToEndSpecMultiJvm3 extends EndToEndSpec | |
class EndToEndSpecMultiJvm4 extends EndToEndSpec | |
abstract class EndToEndSpec extends MultiNodeSpec(BackendConfig) | |
with WordSpec with MustMatchers with BeforeAndAfterAll | |
with ImplicitSender with BeforeAndAfterEach { | |
import BackendConfig._ | |
val facade = TestActorRef(Props[Facade], "facade") | |
override def initialParticipants = roles.size | |
override def beforeAll() = multiNodeSpecBeforeAll() | |
override def afterAll() = multiNodeSpecAfterAll() | |
override def beforeEach() = { | |
} | |
override def afterEach() = { | |
} | |
//#abstract-test | |
"The backend" must { | |
"start the cluster" in within(15.seconds.dilated) { | |
Cluster(system).subscribe(testActor, classOf[MemberUp]) | |
expectMsgClass(classOf[CurrentClusterState]) | |
Seq((analytics, classOf[AnalyticsLeader]), (mailer, classOf[MailerLeader]), (imap, classOf[ImapLeader])).foreach { case (node, leaderClass) => | |
runOn(node) { | |
val leader = system.actorOf( | |
ClusterSingletonManager.props( | |
singletonProps = handOverData => | |
Props.create(leaderClass), | |
singletonName = node.name + "-leader", | |
terminationMessage = PoisonPill, | |
role = Some(node.name) | |
), | |
node.name + "-leader-singleton" | |
) | |
log.info("Created {}", leader.path.toStringWithAddress(leader.path.address)) | |
} | |
} | |
Cluster(system).subscribe(facade, classOf[MemberUp]) | |
Cluster(system) join node(supervisor).address | |
val members = receiveN(4).collect { case MemberUp(m) => m }.toSet | |
val supervisorMember = members.find(_.address == node(supervisor).address).get | |
members.map(_.address) must be ( | |
Set(node(supervisor).address, node(analytics).address, node(mailer).address, node(imap).address)) | |
Cluster(system).unsubscribe(testActor) | |
runOn(supervisor) { | |
val supervisorProbe = system.actorOf(Props(new Actor { | |
def receive = { | |
case x ⇒ testActor.tell(x, sender) | |
} | |
override def preStart = { | |
context.actorOf(Props(new Actor { | |
def receive = { | |
case x => testActor.tell(x, sender) | |
} | |
}), "supervisor-leader") | |
} | |
}), "supervisor-leader-singleton") | |
} | |
// Set supervisor in all facades to point to testActor | |
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge -= "supervisor" | |
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge += ("supervisor" -> SortedSet[Member](supervisorMember)) | |
enterBarrier("all-up") | |
} | |
"start service leaders on each node" in within(15.seconds.dilated) { | |
runOn(supervisor) { | |
if(isNode(supervisor)) { | |
awaitAssert { | |
system.actorSelection(node(supervisor) / "user" / "supervisor-leader-singleton" / "supervisor-leader") ! Identify(1) | |
system.actorSelection(node(analytics) / "user" / "analytics-leader-singleton" / "analytics-leader") ! Identify(1) | |
system.actorSelection(node(imap) / "user" / "imap-leader-singleton" / "imap-leader") ! Identify(2) | |
system.actorSelection(node(mailer) / "user" / "mailer-leader-singleton" / "mailer-leader") ! Identify(3) | |
val refs = receiveN(4).collect { case ActorIdentity(id, Some(ref)) => ref } | |
refs.size must equal (4) | |
refs.foreach(ref => log.info("Found " + ref.path.toStringWithAddress(ref.path.address))) | |
} | |
} | |
} | |
enterBarrier("leaders-up") | |
} | |
"kick off the process in the IMAP agent" in within(70.seconds.dilated) { | |
if(myself == supervisor) { | |
runOn(supervisor) { | |
val pa = ProcessAccounts(Seq(testAccount), 42L) | |
facade ! pa | |
expectMsgType[StartedProcessing](5.seconds.dilated).uuid must be(pa.uuid) | |
expectMsgType[CompletedProcessing](60.seconds.dilated).uuid must be(pa.uuid) | |
} | |
} | |
enterBarrier("imap-done") | |
} | |
"kick off the analytics service" in within(70.seconds.dilated) { | |
if(myself == supervisor) { | |
runOn(supervisor) { | |
val pa = StartAnalytics(Seq(testUser), 42L) | |
facade ! pa | |
expectMsgType[AcknowledgeAnalytics](5.seconds.dilated).uuid must be(pa.uuid) | |
expectMsgType[CompletedAnalytics](60.seconds.dilated).uuid must be(pa.uuid) | |
} | |
} | |
enterBarrier("analytics-done") | |
} | |
"kick off the mailer service" in within(70.seconds.dilated) { | |
if(myself == supervisor) { | |
runOn(supervisor) { | |
val pa = StartMailer(Seq(testUser), 42L) | |
facade ! pa | |
expectMsgType[AcknowledgeMailer](5.seconds.dilated).uuid must be(pa.uuid) | |
expectMsgType[CompletedMailer](60.seconds.dilated).uuid must be(pa.uuid) | |
} | |
} | |
enterBarrier("mailer-done") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment