Created
April 1, 2015 13:10
-
-
Save ibalashov/381f323ca976c3364c84 to your computer and use it in GitHub Desktop.
Akka ping-pong perf test with minimized FJ threads
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akkatest | |
import akka.actor._ | |
import akka.routing.RoundRobinPool | |
import com.typesafe.config.{Config, ConfigFactory} | |
import com.typesafe.scalalogging.slf4j.LazyLogging | |
import iow.common.util.NetUtil | |
import net.iponweb.kafka.util.MiscUtil | |
import scala.util.Try | |
object TestSystem extends LazyLogging { | |
case object PingObj | |
case object PongObj | |
class ActorA extends Actor with MeasureInBatches { | |
override def receive: Receive = { | |
case PingObj => | |
sender() ! PongObj | |
measure() | |
} | |
} | |
class ActorB extends Actor with MeasureInBatches { | |
override def receive: Receive = { | |
case _ => measure() | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val port: Int = args(0).toInt | |
val localIpAddress: String = NetUtil.localIpAddress | |
val commonRemotingConfig: Config = ConfigFactory.parseString( | |
s""" | |
|akka.actor.provider = "akka.remote.RemoteActorRefProvider" | |
|akka.remote.netty.tcp.port = $port | |
|akka.remote.netty.tcp.hostname = $localIpAddress | |
|network-dispatcher { | |
| type = Dispatcher | |
| executor = "fork-join-executor" | |
| fork-join-executor { | |
| parallelism-min = 1 | |
| parallelism-max = 1 | |
| } | |
| throughput = 1000 | |
|} | |
|akka.remote.use-dispatcher = "network-dispatcher" | |
|akka.remote.use-dispatcher-for-io = "network-dispatcher" | |
|akka.remote.netty.tcp.receive-buffer-size = 1048576b | |
|akka.remote.netty.tcp.send-buffer-size = 1048576b | |
""".stripMargin) | |
if (args.length == 1) { | |
val config: Config = ConfigFactory.parseString( | |
s"""| | |
|server-dispatcher { | |
| type = Dispatcher | |
| executor = "fork-join-executor" | |
| fork-join-executor { | |
| parallelism-min = 1 | |
| parallelism-max = 1 | |
| } | |
| throughput = 1000 | |
|} | |
""".stripMargin) | |
ActorSystem("systemA", config.withFallback(commonRemotingConfig)) | |
.actorOf(RoundRobinPool(Runtime.getRuntime.availableProcessors()) // 4 cores | |
.props(Props(classOf[ActorA]) | |
.withDispatcher("server-dispatcher")), | |
"actorA") | |
} else if (args.length >= 2) { | |
val config: Config = ConfigFactory.parseString( | |
s""" | |
|client-dispatcher { | |
| type = Dispatcher | |
| executor = "fork-join-executor" | |
| fork-join-executor { | |
| parallelism-min = 1 | |
| parallelism-max = 1 | |
| } | |
| throughput = 1000 | |
|} | |
""".stripMargin) | |
val remoteActorPath: String = args(1) | |
val actorSystem: ActorSystem = ActorSystem("systemB", config.withFallback(commonRemotingConfig)) | |
val actorAsel: ActorSelection = actorSystem.actorSelection(remoteActorPath) | |
def sendFromActor(actor: ActorRef, messages: Int): Unit = { | |
MiscUtil.timeMs { | |
(1 to messages) foreach { x => | |
actorAsel.tell(PingObj, actor) | |
} | |
logger.info(s"Done sending: $messages messages from: $actor to: $actorAsel") | |
} | |
} | |
val messages: Int = Try { | |
args(2).toInt | |
}.getOrElse(1000000) | |
sendFromActor(actorSystem.actorOf(RoundRobinPool(Runtime.getRuntime.availableProcessors()).props( | |
Props(classOf[ActorB]).withDispatcher("client-dispatcher"))), messages) | |
} | |
} | |
} | |
trait MeasureInBatches extends LazyLogging { | |
var batchSz: Long = 10000 | |
var cursor: Long = 0 | |
var lastTs: Option[Long] = None | |
def measure() = { | |
if (cursor % batchSz == 0) { | |
if (lastTs.isDefined) { | |
val ms: Some[Long] = getCurrentTimeMs | |
val tsDiff: Long = ms.get - lastTs.get | |
val throughput: Double = batchSz.toDouble / tsDiff * 1000 | |
logger.info(s"$this $cursor: $cursor, last batch throughput: $throughput") | |
} | |
lastTs = getCurrentTimeMs | |
} | |
cursor += 1 | |
} | |
def getCurrentTimeMs: Some[Long] = { | |
Some(System.nanoTime() / 1000000) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment