Skip to content

Instantly share code, notes, and snippets.

@visualskyrim
Last active August 29, 2015 14:02
Show Gist options
  • Save visualskyrim/00d519e55399b676ae5b to your computer and use it in GitHub Desktop.
Save visualskyrim/00d519e55399b676ae5b to your computer and use it in GitHub Desktop.
AkkaTest.scala
package Synapse
import akka.actor._
import scala.concurrent.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch._
import scala.concurrent.{Future, Await, ExecutionContext}
import ExecutionContext.Implicits.global
import akka.dispatch.Futures.sequence
import akka.dispatch.Futures.future
import akka.routing.RoundRobinRouter
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
object Synapse {
val taskMark = 10000000
trait Transmitter
case class Stimulus(verb: String, uid: String, localStartedTime: Long) extends Transmitter
case class Feedback(actions: List[String], uid: String, localStartTime: Long) extends Transmitter
implicit val system = ActorSystem("Synapse", ConfigFactory.load.getConfig("dummy"))
val notifier = system.actorOf(
Props(new Effector(taskMark))
.withDispatcher("simu-dispatcher"),
name="system-notifier"
)
val nodeE = system.actorOf(
Props(new ActionCellular(notifier, List[String]("F")))
.withDispatcher("simu-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="NodeE"
)
val nodeC = system.actorOf(
Props(new ActionCellular(notifier, List[String]("F")))
.withDispatcher("simu-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="NodeC"
)
val nodeB = system.actorOf(
Props(new ActionCellular(notifier, List[String]("D", "E")))
.withDispatcher("simu-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="NodeB"
)
val nodeA = system.actorOf(
Props(new ActionCellular(notifier, List[String]("B", "C")))
.withDispatcher("simu-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="NodeA"
)
val conditionReflection = Map[String, ActorRef](
"A" -> nodeA,
"B" -> nodeB,
"C" -> nodeC,
"E" -> nodeE
)
val trans = system.actorOf(
Props(new TransCellular(conditionReflection))
.withDispatcher("workers-dispatcher"),
name="default"
)
def Stimulate() = {
for(i <- 1 to taskMark) {
val localStartedTime = System.currentTimeMillis()
trans ! Stimulus("A", i.toString, localStartedTime)
}
println("All jobs sent")
}
class TransCellular(conditionReflect: Map[String, ActorRef]) extends Actor {
def receive = {
case Stimulus(verb, uid, start) =>
conditionReflect.get(verb).get ! Stimulus(verb, uid, start)
case Feedback(actions, uid, start) =>
actions.foreach{
x =>
if (conditionReflect.contains(x)){
conditionReflect.get(x).get ! Stimulus(x, uid, start)
}
}
}
}
class ActionCellular(notifier: ActorRef, actions: List[String]) extends Actor {
def receive = {
case Stimulus(verb, uid, localStartTime) =>
context.system.scheduler.scheduleOnce(500 milliseconds){
//println("%s => %s : Finished!".format(verb, uid))
notifier ! Feedback(actions, uid, localStartTime)
}
context.system.scheduler.scheduleOnce(500 milliseconds, sender(), Feedback(actions, uid, localStartTime))
}
}
class Effector(taskMark: Int) extends Actor {
val startedTime = System.currentTimeMillis()
var count = 0
val avgWaitMark = scala.collection.mutable.Map[String, Int]()
var currentTotalWait = 0.0
def receive = {
case Feedback(actions, uid, localStartTime) =>
count += 1
if (count%(taskMark*4/20)==0) println("%d/%d processed".format(count, taskMark * 4))
if (count == taskMark * 4) {
val now=System.currentTimeMillis()
println("Everything processed in %d seconds".format((now-startedTime)/1000))
println("Avg. Wait: %f".format(currentTotalWait/taskMark))
context.system.shutdown()
}
}
}
}
dummy {
akka {
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
log-dead-letters = 0
log-dead-letters-during-shutdown = on
actor {
default-dispatcher {
}
}
scheduler {
tick-duration = 50ms
ticks-per-wheel = 128
}
}
simu-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 600
parallelism-factor = 3.0
}
mailbox-capacity = 100000
}
workers-dispatcher {
mailbox-capacity = 10000
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 6000
parallelism-factor = 3.0
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment