Skip to content

Instantly share code, notes, and snippets.

Last active August 29, 2015 14:02
Show Gist options
  • Save visualskyrim/00d519e55399b676ae5b to your computer and use it in GitHub Desktop.
Save visualskyrim/00d519e55399b676ae5b to your computer and use it in GitHub Desktop.
package Synapse
import scala.concurrent.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch._
import scala.concurrent.{Future, Await, ExecutionContext}
import akka.dispatch.Futures.sequence
import akka.dispatch.Futures.future
import akka.routing.RoundRobinRouter
import com.typesafe.config.ConfigFactory
object Synapse {
val taskMark = 10000000
trait Transmitter
case class Stimulus(verb: String, uid: String, localStartedTime: Long) extends Transmitter
case class Feedback(actions: List[String], uid: String, localStartTime: Long) extends Transmitter
implicit val system = ActorSystem("Synapse", ConfigFactory.load.getConfig("dummy"))
val notifier = system.actorOf(
Props(new Effector(taskMark))
val nodeE = system.actorOf(
Props(new ActionCellular(notifier, List[String]("F")))
val nodeC = system.actorOf(
Props(new ActionCellular(notifier, List[String]("F")))
val nodeB = system.actorOf(
Props(new ActionCellular(notifier, List[String]("D", "E")))
val nodeA = system.actorOf(
Props(new ActionCellular(notifier, List[String]("B", "C")))
val conditionReflection = Map[String, ActorRef](
"A" -> nodeA,
"B" -> nodeB,
"C" -> nodeC,
"E" -> nodeE
val trans = system.actorOf(
Props(new TransCellular(conditionReflection))
def Stimulate() = {
for(i <- 1 to taskMark) {
val localStartedTime = System.currentTimeMillis()
trans ! Stimulus("A", i.toString, localStartedTime)
println("All jobs sent")
class TransCellular(conditionReflect: Map[String, ActorRef]) extends Actor {
def receive = {
case Stimulus(verb, uid, start) =>
conditionReflect.get(verb).get ! Stimulus(verb, uid, start)
case Feedback(actions, uid, start) =>
x =>
if (conditionReflect.contains(x)){
conditionReflect.get(x).get ! Stimulus(x, uid, start)
class ActionCellular(notifier: ActorRef, actions: List[String]) extends Actor {
def receive = {
case Stimulus(verb, uid, localStartTime) =>
context.system.scheduler.scheduleOnce(500 milliseconds){
//println("%s => %s : Finished!".format(verb, uid))
notifier ! Feedback(actions, uid, localStartTime)
context.system.scheduler.scheduleOnce(500 milliseconds, sender(), Feedback(actions, uid, localStartTime))
class Effector(taskMark: Int) extends Actor {
val startedTime = System.currentTimeMillis()
var count = 0
val avgWaitMark = scala.collection.mutable.Map[String, Int]()
var currentTotalWait = 0.0
def receive = {
case Feedback(actions, uid, localStartTime) =>
count += 1
if (count%(taskMark*4/20)==0) println("%d/%d processed".format(count, taskMark * 4))
if (count == taskMark * 4) {
val now=System.currentTimeMillis()
println("Everything processed in %d seconds".format((now-startedTime)/1000))
println("Avg. Wait: %f".format(currentTotalWait/taskMark))
dummy {
akka {
loglevel = "DEBUG"
stdout-loglevel = "DEBUG"
log-dead-letters = 0
log-dead-letters-during-shutdown = on
actor {
default-dispatcher {
scheduler {
tick-duration = 50ms
ticks-per-wheel = 128
simu-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 600
parallelism-factor = 3.0
mailbox-capacity = 100000
workers-dispatcher {
mailbox-capacity = 10000
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 6000
parallelism-factor = 3.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment