Created
September 22, 2016 13:30
-
-
Save astonbitecode/efb7f4ae508edb003fbdf0574a7c9089 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package aston.tests | |
import scala.concurrent.Await | |
import scala.concurrent.Future | |
import scala.concurrent.duration.DurationInt | |
import org.junit.runner.RunWith | |
import org.specs2.mutable | |
import org.specs2.runner.JUnitRunner | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.actor.ActorSystem | |
import akka.actor.Cancellable | |
import akka.actor.Props | |
import akka.actor.Stash | |
import akka.actor.actorRef2Scala | |
import akka.pattern.ask | |
import akka.pattern.pipe | |
import akka.routing.RoundRobinPool | |
import akka.util.Timeout | |
@RunWith(classOf[JUnitRunner]) | |
class PrimerSpec extends mutable.Specification { | |
"Primer ".txt | |
sequential | |
val numberOfRequests = 1000000 | |
val warmingIterations = 1000000 | |
"This is " >> { | |
"Test 1" >> { | |
val actorSystem = ActorSystem("myActorSystem") | |
val askingActor = actorSystem.actorOf(Props(new AskingActor())) | |
println("Test 1 warming") | |
execute(actorSystem, askingActor, warmingIterations) | |
for (_ <- 0 to 10) { | |
println("Test 1 starts") | |
val took = execute(actorSystem, askingActor, numberOfRequests) | |
println(s"Executing $numberOfRequests took ${took} milliseconds") | |
} | |
true must beEqualTo(true) | |
} | |
"Test 2" >> { | |
val actorSystem = ActorSystem("myActorSystem") | |
val tellingActor = actorSystem.actorOf(Props(new TellingActor())) | |
println("Test 2 warming") | |
execute(actorSystem, tellingActor, warmingIterations) | |
for (_ <- 0 to 10) { | |
println("Test 2 starts") | |
val took = execute(actorSystem, tellingActor, numberOfRequests) | |
println(s"Executing $numberOfRequests took ${took} milliseconds") | |
} | |
true must beEqualTo(true) | |
} | |
"Test 4" >> { | |
val actorSystem = ActorSystem("myActorSystem") | |
val tellingActor = actorSystem.actorOf(Props(new TellingActorWithCachedIntermediateRouted())) | |
println("Test 4 warming") | |
execute(actorSystem, tellingActor, warmingIterations) | |
for (_ <- 0 to 10) { | |
println("Test 4 starts") | |
val took = execute(actorSystem, tellingActor, numberOfRequests) | |
println(s"Executing $numberOfRequests took ${took} milliseconds") | |
} | |
true must beEqualTo(true) | |
} | |
} | |
def execute(actorSystem: ActorSystem, askingActor: ActorRef, iterations: Int): Long = { | |
implicit val timeout = Timeout(10.minutes) | |
val testActor = actorSystem.actorOf(Props(new TestExecutorActor())) | |
val start = System.currentTimeMillis() | |
val f = testActor ? Test(askingActor, iterations) | |
Await.result(f, 30.minutes) | |
val end = System.currentTimeMillis() | |
end - start | |
} | |
} | |
case class English(text: String) | |
trait Translation { | |
val text: String | |
} | |
case class FrenchTranslation(text: String) extends Translation | |
case class SpanishTranslation(text: String) extends Translation | |
case class GermanTranslation(text: String) extends Translation | |
case class TranslationsResult(translations: List[Translation], replyTo: ActorRef) | |
case class Failure(replyTo: ActorRef, description: String) | |
case class Test(actorToTest: ActorRef, iterations: Int) | |
class TestExecutorActor extends Actor { | |
var responses = 0 | |
var expectedResponses = 0 | |
var replyTo: ActorRef = _ | |
override def receive: Receive = { | |
case Test(actorToTest, iterations) => { | |
expectedResponses = iterations | |
replyTo = sender | |
for (i <- 0 to iterations) { | |
actorToTest ! English(i.toString) | |
} | |
} | |
case l: List[_] => { | |
if (responses == expectedResponses) { | |
replyTo ! responses | |
} else { | |
responses = responses + 1 | |
} | |
} | |
case error: akka.actor.Status.Failure => replyTo ! error | |
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})")) | |
} | |
} | |
class AskingActor extends Actor { | |
val translationActors = List( | |
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))), | |
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))), | |
context.actorOf(Props(new TranslatingActor(new GermanTranslationService)))) | |
implicit val timeout = Timeout(10.seconds) | |
implicit val ec = context.dispatcher | |
override def receive: Receive = { | |
case english @ English(text) => { | |
val replyTo = sender | |
val listOfFutures = for (translationActor <- translationActors) yield { | |
(translationActor ? english).mapTo[Translation] | |
} | |
val futureOfLists = Future.sequence(listOfFutures) | |
futureOfLists.map(TranslationsResult(_, replyTo)).pipeTo(self) | |
} | |
case TranslationsResult(result, replyTo) => { | |
replyTo ! result | |
} | |
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})")) | |
} | |
} | |
class TellingActor extends Actor { | |
implicit val timeout = Timeout(10.seconds) | |
implicit val ec = context.dispatcher | |
var executions = 0 | |
override def receive: Receive = { | |
case english: English => { | |
executions = executions + 1 | |
val intermediateActor = context.actorOf(Props(new IntermediateActor())) | |
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor timed out on execution #" + executions)) | |
intermediateActor ! (english, sender, cancellable) | |
} | |
case TranslationsResult(result, replyTo) => { | |
replyTo ! result | |
} | |
case Failure(replyTo: ActorRef, description) => { | |
replyTo ! akka.actor.Status.Failure(new RuntimeException(description)) | |
} | |
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})")) | |
} | |
} | |
class IntermediateActor extends Actor { | |
val translationActors = List( | |
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))), | |
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))), | |
context.actorOf(Props(new TranslatingActor(new GermanTranslationService)))) | |
implicit val timeout = Timeout(10.seconds) | |
implicit val ec = context.dispatcher | |
var localcancellable: Cancellable = _ | |
var cancellable: Cancellable = _ | |
override def receive: Receive = { | |
case (english: English, originator: ActorRef, to: Cancellable) => { | |
context.become(waitingForTranslations(None, None, None, originator, sender)) | |
localcancellable = context.system.scheduler.scheduleOnce(timeout.duration, sender, Failure(originator, "Intermediate Actor timed out")) | |
cancellable = to | |
for (translationActor <- translationActors) { | |
translationActor ! english | |
} | |
} | |
} | |
def waitingForTranslations(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Receive = { | |
case translation: FrenchTranslation => { | |
context.become(waitingForTranslations(Some(translation), spanish, german, originator, replyTo)) | |
finalizeIfNeeded(Some(translation), spanish, german, originator, replyTo) | |
} | |
case translation: SpanishTranslation => { | |
context.become(waitingForTranslations(french, Some(translation), german, originator, replyTo)) | |
finalizeIfNeeded(french, Some(translation), german, originator, replyTo) | |
} | |
case translation: GermanTranslation => { | |
context.become(waitingForTranslations(french, spanish, Some(translation), originator, replyTo)) | |
finalizeIfNeeded(french, spanish, Some(translation), originator, replyTo) | |
} | |
case Failure(replyTo, description) => { | |
val append = s"having $french, $spanish, $german" | |
replyTo ! akka.actor.Status.Failure(new RuntimeException(description + " " + append)) | |
cancellable.cancel | |
localcancellable.cancel | |
} | |
} | |
def finalizeIfNeeded(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Unit = { | |
if (french.nonEmpty && spanish.nonEmpty && german.nonEmpty) { | |
replyTo ! TranslationsResult(List(french.get, spanish.get, german.get), originator) | |
cancellable.cancel | |
localcancellable.cancel | |
context.stop(self) | |
} | |
} | |
} | |
class TellingActorWithCachedIntermediate extends Actor { | |
implicit val timeout = Timeout(10.seconds) | |
implicit val ec = context.dispatcher | |
var executions = 0 | |
val intermediateActor = context.actorOf(Props(new CachedIntermediateActor())) | |
override def receive: Receive = { | |
case english: English => { | |
executions = executions + 1 | |
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor with cached intermediate timed out on execution #" + executions)) | |
intermediateActor ! (english, sender, cancellable) | |
} | |
case TranslationsResult(result, replyTo) => { | |
replyTo ! result | |
} | |
case Failure(replyTo: ActorRef, description) => { | |
replyTo ! akka.actor.Status.Failure(new RuntimeException(description)) | |
} | |
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})")) | |
} | |
} | |
class TellingActorWithCachedIntermediateRouted extends Actor { | |
implicit val timeout = Timeout(10.seconds) | |
implicit val ec = context.dispatcher | |
var executions = 0 | |
val intermediateActor = context.actorOf(Props(new CachedIntermediateActor()).withRouter(RoundRobinPool(1000))) | |
override def receive: Receive = { | |
case english: English => { | |
executions = executions + 1 | |
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor with cached intermediate timed out on execution #" + executions)) | |
intermediateActor ! (english, sender, cancellable) | |
} | |
case TranslationsResult(result, replyTo) => { | |
replyTo ! result | |
} | |
case Failure(replyTo: ActorRef, description) => { | |
replyTo ! akka.actor.Status.Failure(new RuntimeException(description)) | |
} | |
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})")) | |
} | |
} | |
class CachedIntermediateActor extends Actor with Stash { | |
val translationActors = List( | |
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))), | |
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))), | |
context.actorOf(Props(new TranslatingActor(new GermanTranslationService)))) | |
implicit val timeout = Timeout(10.seconds) | |
implicit val ec = context.dispatcher | |
var localcancellable: Cancellable = _ | |
var cancellable: Cancellable = _ | |
override def receive: Receive = { | |
case (english: English, originator: ActorRef, to: Cancellable) => { | |
context.become(waitingForTranslations(None, None, None, originator, sender)) | |
localcancellable = context.system.scheduler.scheduleOnce(timeout.duration, sender, Failure(originator, "Intermediate Actor timed out")) | |
cancellable = to | |
for (translationActor <- translationActors) { | |
translationActor ! english | |
} | |
} | |
} | |
def waitingForTranslations(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Receive = { | |
case (english: English, originator: ActorRef, to: Cancellable) => stash | |
case translation: FrenchTranslation => { | |
context.become(waitingForTranslations(Some(translation), spanish, german, originator, replyTo)) | |
finalizeIfNeeded(Some(translation), spanish, german, originator, replyTo) | |
} | |
case translation: SpanishTranslation => { | |
context.become(waitingForTranslations(french, Some(translation), german, originator, replyTo)) | |
finalizeIfNeeded(french, Some(translation), german, originator, replyTo) | |
} | |
case translation: GermanTranslation => { | |
context.become(waitingForTranslations(french, spanish, Some(translation), originator, replyTo)) | |
finalizeIfNeeded(french, spanish, Some(translation), originator, replyTo) | |
} | |
case Failure(replyTo, description) => { | |
val append = s"having $french, $spanish, $german" | |
replyTo ! akka.actor.Status.Failure(new RuntimeException(description + " " + append)) | |
cancellable.cancel | |
localcancellable.cancel | |
unstashAll | |
} | |
} | |
def finalizeIfNeeded(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Unit = { | |
if (french.nonEmpty && spanish.nonEmpty && german.nonEmpty) { | |
replyTo ! TranslationsResult(List(french.get, spanish.get, german.get), originator) | |
cancellable.cancel | |
localcancellable.cancel | |
context.become(receive) | |
unstashAll | |
} | |
} | |
} | |
class TranslatingActor(translationService: TranslationService) extends Actor { | |
override def receive: Receive = { | |
case English(text) => { | |
sender ! translationService.translate(text) | |
} | |
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})")) | |
} | |
} | |
trait TranslationService { | |
def translate(text: String): Translation | |
} | |
class FrenchTranslationService extends TranslationService { | |
override def translate(text: String): Translation = FrenchTranslation(s"This is the french translation of '$text'") | |
} | |
class SpanishTranslationService extends TranslationService { | |
override def translate(text: String): Translation = SpanishTranslation(s"This is the spanish translation of '$text'") | |
} | |
class GermanTranslationService extends TranslationService { | |
override def translate(text: String): Translation = GermanTranslation(s"This is the german translation of '$text'") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment