Skip to content

Instantly share code, notes, and snippets.

@astonbitecode
Created September 22, 2016 13:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save astonbitecode/efb7f4ae508edb003fbdf0574a7c9089 to your computer and use it in GitHub Desktop.
Save astonbitecode/efb7f4ae508edb003fbdf0574a7c9089 to your computer and use it in GitHub Desktop.
package aston.tests
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import org.junit.runner.RunWith
import org.specs2.mutable
import org.specs2.runner.JUnitRunner
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Cancellable
import akka.actor.Props
import akka.actor.Stash
import akka.actor.actorRef2Scala
import akka.pattern.ask
import akka.pattern.pipe
import akka.routing.RoundRobinPool
import akka.util.Timeout
@RunWith(classOf[JUnitRunner])
class PrimerSpec extends mutable.Specification {
"Primer ".txt
sequential
val numberOfRequests = 1000000
val warmingIterations = 1000000
"This is " >> {
"Test 1" >> {
val actorSystem = ActorSystem("myActorSystem")
val askingActor = actorSystem.actorOf(Props(new AskingActor()))
println("Test 1 warming")
execute(actorSystem, askingActor, warmingIterations)
for (_ <- 0 to 10) {
println("Test 1 starts")
val took = execute(actorSystem, askingActor, numberOfRequests)
println(s"Executing $numberOfRequests took ${took} milliseconds")
}
true must beEqualTo(true)
}
"Test 2" >> {
val actorSystem = ActorSystem("myActorSystem")
val tellingActor = actorSystem.actorOf(Props(new TellingActor()))
println("Test 2 warming")
execute(actorSystem, tellingActor, warmingIterations)
for (_ <- 0 to 10) {
println("Test 2 starts")
val took = execute(actorSystem, tellingActor, numberOfRequests)
println(s"Executing $numberOfRequests took ${took} milliseconds")
}
true must beEqualTo(true)
}
"Test 4" >> {
val actorSystem = ActorSystem("myActorSystem")
val tellingActor = actorSystem.actorOf(Props(new TellingActorWithCachedIntermediateRouted()))
println("Test 4 warming")
execute(actorSystem, tellingActor, warmingIterations)
for (_ <- 0 to 10) {
println("Test 4 starts")
val took = execute(actorSystem, tellingActor, numberOfRequests)
println(s"Executing $numberOfRequests took ${took} milliseconds")
}
true must beEqualTo(true)
}
}
def execute(actorSystem: ActorSystem, askingActor: ActorRef, iterations: Int): Long = {
implicit val timeout = Timeout(10.minutes)
val testActor = actorSystem.actorOf(Props(new TestExecutorActor()))
val start = System.currentTimeMillis()
val f = testActor ? Test(askingActor, iterations)
Await.result(f, 30.minutes)
val end = System.currentTimeMillis()
end - start
}
}
case class English(text: String)
trait Translation {
val text: String
}
case class FrenchTranslation(text: String) extends Translation
case class SpanishTranslation(text: String) extends Translation
case class GermanTranslation(text: String) extends Translation
case class TranslationsResult(translations: List[Translation], replyTo: ActorRef)
case class Failure(replyTo: ActorRef, description: String)
case class Test(actorToTest: ActorRef, iterations: Int)
class TestExecutorActor extends Actor {
var responses = 0
var expectedResponses = 0
var replyTo: ActorRef = _
override def receive: Receive = {
case Test(actorToTest, iterations) => {
expectedResponses = iterations
replyTo = sender
for (i <- 0 to iterations) {
actorToTest ! English(i.toString)
}
}
case l: List[_] => {
if (responses == expectedResponses) {
replyTo ! responses
} else {
responses = responses + 1
}
}
case error: akka.actor.Status.Failure => replyTo ! error
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
}
}
class AskingActor extends Actor {
val translationActors = List(
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))),
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))),
context.actorOf(Props(new TranslatingActor(new GermanTranslationService))))
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
override def receive: Receive = {
case english @ English(text) => {
val replyTo = sender
val listOfFutures = for (translationActor <- translationActors) yield {
(translationActor ? english).mapTo[Translation]
}
val futureOfLists = Future.sequence(listOfFutures)
futureOfLists.map(TranslationsResult(_, replyTo)).pipeTo(self)
}
case TranslationsResult(result, replyTo) => {
replyTo ! result
}
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
}
}
class TellingActor extends Actor {
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var executions = 0
override def receive: Receive = {
case english: English => {
executions = executions + 1
val intermediateActor = context.actorOf(Props(new IntermediateActor()))
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor timed out on execution #" + executions))
intermediateActor ! (english, sender, cancellable)
}
case TranslationsResult(result, replyTo) => {
replyTo ! result
}
case Failure(replyTo: ActorRef, description) => {
replyTo ! akka.actor.Status.Failure(new RuntimeException(description))
}
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
}
}
class IntermediateActor extends Actor {
val translationActors = List(
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))),
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))),
context.actorOf(Props(new TranslatingActor(new GermanTranslationService))))
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var localcancellable: Cancellable = _
var cancellable: Cancellable = _
override def receive: Receive = {
case (english: English, originator: ActorRef, to: Cancellable) => {
context.become(waitingForTranslations(None, None, None, originator, sender))
localcancellable = context.system.scheduler.scheduleOnce(timeout.duration, sender, Failure(originator, "Intermediate Actor timed out"))
cancellable = to
for (translationActor <- translationActors) {
translationActor ! english
}
}
}
def waitingForTranslations(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Receive = {
case translation: FrenchTranslation => {
context.become(waitingForTranslations(Some(translation), spanish, german, originator, replyTo))
finalizeIfNeeded(Some(translation), spanish, german, originator, replyTo)
}
case translation: SpanishTranslation => {
context.become(waitingForTranslations(french, Some(translation), german, originator, replyTo))
finalizeIfNeeded(french, Some(translation), german, originator, replyTo)
}
case translation: GermanTranslation => {
context.become(waitingForTranslations(french, spanish, Some(translation), originator, replyTo))
finalizeIfNeeded(french, spanish, Some(translation), originator, replyTo)
}
case Failure(replyTo, description) => {
val append = s"having $french, $spanish, $german"
replyTo ! akka.actor.Status.Failure(new RuntimeException(description + " " + append))
cancellable.cancel
localcancellable.cancel
}
}
def finalizeIfNeeded(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Unit = {
if (french.nonEmpty && spanish.nonEmpty && german.nonEmpty) {
replyTo ! TranslationsResult(List(french.get, spanish.get, german.get), originator)
cancellable.cancel
localcancellable.cancel
context.stop(self)
}
}
}
class TellingActorWithCachedIntermediate extends Actor {
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var executions = 0
val intermediateActor = context.actorOf(Props(new CachedIntermediateActor()))
override def receive: Receive = {
case english: English => {
executions = executions + 1
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor with cached intermediate timed out on execution #" + executions))
intermediateActor ! (english, sender, cancellable)
}
case TranslationsResult(result, replyTo) => {
replyTo ! result
}
case Failure(replyTo: ActorRef, description) => {
replyTo ! akka.actor.Status.Failure(new RuntimeException(description))
}
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
}
}
class TellingActorWithCachedIntermediateRouted extends Actor {
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var executions = 0
val intermediateActor = context.actorOf(Props(new CachedIntermediateActor()).withRouter(RoundRobinPool(1000)))
override def receive: Receive = {
case english: English => {
executions = executions + 1
val cancellable = context.system.scheduler.scheduleOnce(timeout.duration, self, Failure(sender, "Telling Actor with cached intermediate timed out on execution #" + executions))
intermediateActor ! (english, sender, cancellable)
}
case TranslationsResult(result, replyTo) => {
replyTo ! result
}
case Failure(replyTo: ActorRef, description) => {
replyTo ! akka.actor.Status.Failure(new RuntimeException(description))
}
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
}
}
class CachedIntermediateActor extends Actor with Stash {
val translationActors = List(
context.actorOf(Props(new TranslatingActor(new FrenchTranslationService))),
context.actorOf(Props(new TranslatingActor(new SpanishTranslationService))),
context.actorOf(Props(new TranslatingActor(new GermanTranslationService))))
implicit val timeout = Timeout(10.seconds)
implicit val ec = context.dispatcher
var localcancellable: Cancellable = _
var cancellable: Cancellable = _
override def receive: Receive = {
case (english: English, originator: ActorRef, to: Cancellable) => {
context.become(waitingForTranslations(None, None, None, originator, sender))
localcancellable = context.system.scheduler.scheduleOnce(timeout.duration, sender, Failure(originator, "Intermediate Actor timed out"))
cancellable = to
for (translationActor <- translationActors) {
translationActor ! english
}
}
}
def waitingForTranslations(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Receive = {
case (english: English, originator: ActorRef, to: Cancellable) => stash
case translation: FrenchTranslation => {
context.become(waitingForTranslations(Some(translation), spanish, german, originator, replyTo))
finalizeIfNeeded(Some(translation), spanish, german, originator, replyTo)
}
case translation: SpanishTranslation => {
context.become(waitingForTranslations(french, Some(translation), german, originator, replyTo))
finalizeIfNeeded(french, Some(translation), german, originator, replyTo)
}
case translation: GermanTranslation => {
context.become(waitingForTranslations(french, spanish, Some(translation), originator, replyTo))
finalizeIfNeeded(french, spanish, Some(translation), originator, replyTo)
}
case Failure(replyTo, description) => {
val append = s"having $french, $spanish, $german"
replyTo ! akka.actor.Status.Failure(new RuntimeException(description + " " + append))
cancellable.cancel
localcancellable.cancel
unstashAll
}
}
def finalizeIfNeeded(french: Option[Translation], spanish: Option[Translation], german: Option[Translation], originator: ActorRef, replyTo: ActorRef): Unit = {
if (french.nonEmpty && spanish.nonEmpty && german.nonEmpty) {
replyTo ! TranslationsResult(List(french.get, spanish.get, german.get), originator)
cancellable.cancel
localcancellable.cancel
context.become(receive)
unstashAll
}
}
}
class TranslatingActor(translationService: TranslationService) extends Actor {
override def receive: Receive = {
case English(text) => {
sender ! translationService.translate(text)
}
case other => println((s"${this.getClass} cannot handle $other of type (${other.getClass})"))
}
}
trait TranslationService {
def translate(text: String): Translation
}
class FrenchTranslationService extends TranslationService {
override def translate(text: String): Translation = FrenchTranslation(s"This is the french translation of '$text'")
}
class SpanishTranslationService extends TranslationService {
override def translate(text: String): Translation = SpanishTranslation(s"This is the spanish translation of '$text'")
}
class GermanTranslationService extends TranslationService {
override def translate(text: String): Translation = GermanTranslation(s"This is the german translation of '$text'")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment