Skip to content

Instantly share code, notes, and snippets.

@ryantanner
Last active December 21, 2015 04:09
Show Gist options
  • Save ryantanner/6247240 to your computer and use it in GitHub Desktop.
Save ryantanner/6247240 to your computer and use it in GitHub Desktop.
// pass in the role name of the service being tested
object BackendConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
val supervisor = role("supervisor")
val analytics = role("analytics")
val imap = role("imap")
val mailer = role("mailer")
val nodeConfig: Config = ConfigFactory.parseString(s"""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.log-info = off
akka.actor.debug.receive = off
akka.test.timefactor = 4
akka.cluster.failure-detector.threshold = 8
# don't use sigar for tests, native lib not in path
akka.cluster.metrics.collector-class = akka.cluster.JmxMetricsCollector
akka.actor.deployment {
/analytics-leader-singleton/analytics-leader/nodeRouter {
router = round-robin
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-role = analytics
}
}
/mailer-leader-singleton/mailer-leader/nodeRouter {
router = round-robin
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-role = mailer
}
}
/imap-leader-singleton/imap-leader/nodeRouter {
router = round-robin
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-role = imap
}
}
/imap-leader-singleton/imap-leader/nodeRouter/"*"/imap-message_reader_router {
router = round-robin
nr-of-instances = 5
}
/imap-leader-singleton/imap-leader/nodeRouter/"*"/imap-message_writer_router {
router = round-robin
nr-of-instances = 5
}
/mailer-leader-singleton/mailer-leader/nodeRouter/"*"/statsReaderRouter {
router = round-robin
nr-of-instances = 5
}
/mailer-leader-singleton/mailer-leader/nodeRouter/"*"/senderRouter {
router = round-robin
nr-of-instances = 5
}
/mailer-leader-singleton/mailer-leader/nodeRouter/"*"/rendererRouter {
router = round-robin
nr-of-instances = 5
}
}
""")
val supervisorConfig = ConfigFactory.parseString("""
akka.cluster.roles = [ supervisor ]
""")
val analyticsConfig = ConfigFactory.parseString("""
akka.cluster.roles = [ analytics ]
""")
val mailerConfig = ConfigFactory.parseString("""
akka.cluster.roles = [ mailer ]
""")
val imapConfig = ConfigFactory.parseString("""
akka.cluster.roles = [ imap ]
""")
//commonConfig(nodeConfig)
nodeConfig(supervisor)(supervisorConfig.withFallback(nodeConfig))
nodeConfig(analytics)(analyticsConfig.withFallback(nodeConfig))
nodeConfig(mailer)(mailerConfig.withFallback(nodeConfig))
nodeConfig(imap)(imapConfig.withFallback(nodeConfig))
}
class EndToEndSpecMultiJvm1 extends EndToEndSpec
class EndToEndSpecMultiJvm2 extends EndToEndSpec
class EndToEndSpecMultiJvm3 extends EndToEndSpec
class EndToEndSpecMultiJvm4 extends EndToEndSpec
abstract class EndToEndSpec extends MultiNodeSpec(BackendConfig)
with WordSpec with MustMatchers with BeforeAndAfterAll
with ImplicitSender with BeforeAndAfterEach {
import BackendConfig._
val facade = TestActorRef(Props[Facade], "facade")
override def initialParticipants = roles.size
override def beforeAll() = multiNodeSpecBeforeAll()
override def afterAll() = multiNodeSpecAfterAll()
override def beforeEach() = {
}
override def afterEach() = {
}
//#abstract-test
"The backend" must {
"start the cluster" in within(15.seconds.dilated) {
Cluster(system).subscribe(testActor, classOf[MemberUp])
expectMsgClass(classOf[CurrentClusterState])
Seq((analytics, classOf[AnalyticsLeader]), (mailer, classOf[MailerLeader]), (imap, classOf[ImapLeader])).foreach { case (node, leaderClass) =>
runOn(node) {
val leader = system.actorOf(
ClusterSingletonManager.props(
singletonProps = handOverData =>
Props.create(leaderClass),
singletonName = node.name + "-leader",
terminationMessage = PoisonPill,
role = Some(node.name)
),
node.name + "-leader-singleton"
)
log.info("Created {}", leader.path.toStringWithAddress(leader.path.address))
}
}
Cluster(system).subscribe(facade, classOf[MemberUp])
Cluster(system) join node(supervisor).address
val members = receiveN(4).collect { case MemberUp(m) => m }.toSet
val supervisorMember = members.find(_.address == node(supervisor).address).get
members.map(_.address) must be (
Set(node(supervisor).address, node(analytics).address, node(mailer).address, node(imap).address))
Cluster(system).unsubscribe(testActor)
runOn(supervisor) {
val supervisorProbe = system.actorOf(Props(new Actor {
def receive = {
case x ⇒ testActor.tell(x, sender)
}
override def preStart = {
context.actorOf(Props(new Actor {
def receive = {
case x => testActor.tell(x, sender)
}
}), "supervisor-leader")
}
}), "supervisor-leader-singleton")
}
// Set supervisor in all facades to point to testActor
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge -= "supervisor"
facade.underlyingActor.asInstanceOf[Facade].roleMembersByAge += ("supervisor" -> SortedSet[Member](supervisorMember))
enterBarrier("all-up")
}
"start service leaders on each node" in within(15.seconds.dilated) {
runOn(supervisor) {
if(isNode(supervisor)) {
awaitAssert {
system.actorSelection(node(supervisor) / "user" / "supervisor-leader-singleton" / "supervisor-leader") ! Identify(1)
system.actorSelection(node(analytics) / "user" / "analytics-leader-singleton" / "analytics-leader") ! Identify(1)
system.actorSelection(node(imap) / "user" / "imap-leader-singleton" / "imap-leader") ! Identify(2)
system.actorSelection(node(mailer) / "user" / "mailer-leader-singleton" / "mailer-leader") ! Identify(3)
val refs = receiveN(4).collect { case ActorIdentity(id, Some(ref)) => ref }
refs.size must equal (4)
refs.foreach(ref => log.info("Found " + ref.path.toStringWithAddress(ref.path.address)))
}
}
}
enterBarrier("leaders-up")
}
"kick off the process in the IMAP agent" in within(70.seconds.dilated) {
if(myself == supervisor) {
runOn(supervisor) {
val pa = ProcessAccounts(Seq(testAccount), 42L)
facade ! pa
expectMsgType[StartedProcessing](5.seconds.dilated).uuid must be(pa.uuid)
expectMsgType[CompletedProcessing](60.seconds.dilated).uuid must be(pa.uuid)
}
}
enterBarrier("imap-done")
}
"kick off the analytics service" in within(70.seconds.dilated) {
if(myself == supervisor) {
runOn(supervisor) {
val pa = StartAnalytics(Seq(testUser), 42L)
facade ! pa
expectMsgType[AcknowledgeAnalytics](5.seconds.dilated).uuid must be(pa.uuid)
expectMsgType[CompletedAnalytics](60.seconds.dilated).uuid must be(pa.uuid)
}
}
enterBarrier("analytics-done")
}
"kick off the mailer service" in within(70.seconds.dilated) {
if(myself == supervisor) {
runOn(supervisor) {
val pa = StartMailer(Seq(testUser), 42L)
facade ! pa
expectMsgType[AcknowledgeMailer](5.seconds.dilated).uuid must be(pa.uuid)
expectMsgType[CompletedMailer](60.seconds.dilated).uuid must be(pa.uuid)
}
}
enterBarrier("mailer-done")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment