package aston.tests
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import org.junit.runner.RunWith
import org.specs2.mutable
import org.specs2.runner.JUnitRunner
import akka.pattern.ask
import akka.pattern.pipe
import akka.routing.RoundRobinPool
import akka.util.Timeout
class PrimerSpec extends mutable.Specification {
"Primer ".txt
val numberOfRequests = 1000000
val warmingIterations = 1000000
"This is " >> {
"Test 1" >> {
val actorSystem = ActorSystem("myActorSystem")
val askingActor = actorSystem.actorOf(Props(new AskingActor()))
println("Test 1 warming")
execute(actorSystem, askingActor, warmingIterations)
for (_ <- 0 to 10) {
println("Test 1 starts")
val took = execute(actorSystem, askingActor, numberOfRequests)
println(s"Executing $numberOfRequests took ${took} milliseconds")
true must beEqualTo(true)
"Test 2" >> {
val actorSystem = ActorSystem("myActorSystem")
val tellingActor = actorSystem.actorOf(Props(new TellingActor()))
println("Test 2 warming")
execute(actorSystem, tellingActor, warmingIterations)
for (_ <- 0 to 10) {
println("Test 2 starts")
val took = execute(actorSystem, tellingActor, numberOfRequests)
println(s"Executing $numberOfRequests took ${took} milliseconds")
true must beEqualTo(true)
"Test 4" >> {
val actorSystem = ActorSystem("myActorSystem")
val tellingActor = actorSystem.actorOf(Props(new TellingActorWithCachedIntermediateRouted()))
println("Test 4 warming")
execute(actorSystem, tellingActor, warmingIterations)
for (_ <- 0 to 10) {
println("Test 4 starts")
val took = execute(actorSystem, tellingActor, numberOfRequests)
println(s"Executing $numberOfRequests took ${took} milliseconds")
true must beEqualTo(true)
def execute(actorSystem: ActorSystem, askingActor: ActorRef, iterations: Int): Long = {
implicit val timeout = Timeout(10.minutes)
val testActor = actorSystem.actorOf(Props(new TestExecutorActor()))
val start = System.currentTimeMillis()
val f = testActor ? Test(askingActor, iterations)
Await.result(f, 30.minutes)
val end = System.currentTimeMillis()
end - start
case class English(text: String)
trait Translation {
val text: String
case class FrenchTranslation(text: String) extends Translation
case class SpanishTranslation(text: String) extends Translation
case class GermanTranslation(text: String) extends Translation
case class TranslationsResult(translations: List[Translation], replyTo: ActorRef)
case class Failure(replyTo: ActorRef, description: String)
case class Test(actorToTest: ActorRef, iterations: Int)
class TestExecutorActor extends Actor {
var responses = 0
var expectedResponses = 0
var replyTo: ActorRef = _
override def receive: Receive = {
case Test(actorToTest, iterations) => {
expectedResponses = iterations
replyTo = sender
for (i <- 0 to iterations) {
actorToTest ! English(i.toString)
case l: List[_] => {
if (responses == expectedResponses) {
replyTo ! responses
} else {
responses = responses + 1
case error: => replyTo ! error
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
class AskingActor extends Actor {
val translationActors = List(
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))),
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))),
context.actorOf(Props(new TranslatingActor(new GermanTranslationService))))
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
override def receive: Receive = {
case english @ English(text) => {
val replyTo = sender
val listOfFutures = for (translationActor <- translationActors) yield {
(translationActor ? english).mapTo[Translation]
val futureOfLists = Future.sequence(listOfFutures), replyTo)).pipeTo(self)
case TranslationsResult(result, replyTo) => {
replyTo ! result
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
class TellingActor extends Actor {
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var executions = 0
override def receive: Receive = {
case english: English => {
executions = executions + 1
val intermediateActor = context.actorOf(Props(new IntermediateActor()))
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor timed out on execution #" + executions))
intermediateActor ! (english, sender, cancellable)
case TranslationsResult(result, replyTo) => {
replyTo ! result
case Failure(replyTo: ActorRef, description) => {
replyTo ! RuntimeException(description))
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
class IntermediateActor extends Actor {
val translationActors = List(
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))),
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))),
context.actorOf(Props(new TranslatingActor(new GermanTranslationService))))
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var localcancellable: Cancellable = _
var cancellable: Cancellable = _
override def receive: Receive = {
case (english: English, originator: ActorRef, to: Cancellable) => {
context.become(waitingForTranslations(None, None, None, originator, sender))
localcancellable = context.system.scheduler.scheduleOnce(timeout.duration, sender, Failure(originator, "Intermediate Actor timed out"))
cancellable = to
for (translationActor <- translationActors) {
translationActor ! english
def waitingForTranslations(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Receive = {
case translation: FrenchTranslation => {
context.become(waitingForTranslations(Some(translation), spanish, german, originator, replyTo))
finalizeIfNeeded(Some(translation), spanish, german, originator, replyTo)
case translation: SpanishTranslation => {
context.become(waitingForTranslations(french, Some(translation), german, originator, replyTo))
finalizeIfNeeded(french, Some(translation), german, originator, replyTo)
case translation: GermanTranslation => {
context.become(waitingForTranslations(french, spanish, Some(translation), originator, replyTo))
finalizeIfNeeded(french, spanish, Some(translation), originator, replyTo)
case Failure(replyTo, description) => {
val append = s"having $french, $spanish, $german"
replyTo ! RuntimeException(description + " " + append))
def finalizeIfNeeded(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Unit = {
if (french.nonEmpty && spanish.nonEmpty && german.nonEmpty) {
replyTo ! TranslationsResult(List(french.get, spanish.get, german.get), originator)
class TellingActorWithCachedIntermediate extends Actor {
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var executions = 0
val intermediateActor = context.actorOf(Props(new CachedIntermediateActor()))
override def receive: Receive = {
case english: English => {
executions = executions + 1
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor with cached intermediate timed out on execution #" + executions))
intermediateActor ! (english, sender, cancellable)
case TranslationsResult(result, replyTo) => {
replyTo ! result
case Failure(replyTo: ActorRef, description) => {
replyTo ! RuntimeException(description))
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
class TellingActorWithCachedIntermediateRouted extends Actor {
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var executions = 0
val intermediateActor = context.actorOf(Props(new CachedIntermediateActor()).withRouter(RoundRobinPool(1000)))
override def receive: Receive = {
case english: English => {
executions = executions + 1
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor with cached intermediate timed out on execution #" + executions))
intermediateActor ! (english, sender, cancellable)
case TranslationsResult(result, replyTo) => {
replyTo ! result
case Failure(replyTo: ActorRef, description) => {
replyTo ! RuntimeException(description))
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
class CachedIntermediateActor extends Actor with Stash {
val translationActors = List(
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))),
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))),
context.actorOf(Props(new TranslatingActor(new GermanTranslationService))))
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var localcancellable: Cancellable = _
var cancellable: Cancellable = _
override def receive: Receive = {
case (english: English, originator: ActorRef, to: Cancellable) => {
context.become(waitingForTranslations(None, None, None, originator, sender))
localcancellable = context.system.scheduler.scheduleOnce(timeout.duration, sender, Failure(originator, "Intermediate Actor timed out"))
cancellable = to
for (translationActor <- translationActors) {
translationActor ! english
def waitingForTranslations(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Receive = {
case (english: English, originator: ActorRef, to: Cancellable) => stash
case translation: FrenchTranslation => {
context.become(waitingForTranslations(Some(translation), spanish, german, originator, replyTo))
finalizeIfNeeded(Some(translation), spanish, german, originator, replyTo)
case translation: SpanishTranslation => {
context.become(waitingForTranslations(french, Some(translation), german, originator, replyTo))
finalizeIfNeeded(french, Some(translation), german, originator, replyTo)
case translation: GermanTranslation => {
context.become(waitingForTranslations(french, spanish, Some(translation), originator, replyTo))
finalizeIfNeeded(french, spanish, Some(translation), originator, replyTo)
case Failure(replyTo, description) => {
val append = s"having $french, $spanish, $german"
replyTo ! RuntimeException(description + " " + append))
def finalizeIfNeeded(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Unit = {
if (french.nonEmpty && spanish.nonEmpty && german.nonEmpty) {
replyTo ! TranslationsResult(List(french.get, spanish.get, german.get), originator)
class TranslatingActor(translationService: TranslationService) extends Actor {
override def receive: Receive = {
case English(text) => {
sender ! translationService.translate(text)
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
trait TranslationService {
def translate(text: String): Translation
class FrenchTranslationService extends TranslationService {
override def translate(text: String): Translation = FrenchTranslation(s"This is the french translation of '$text'")
class SpanishTranslationService extends TranslationService {
override def translate(text: String): Translation = SpanishTranslation(s"This is the spanish translation of '$text'")
class GermanTranslationService extends TranslationService {
override def translate(text: String): Translation = GermanTranslation(s"This is the german translation of '$text'")
