Skip to content

Instantly share code, notes, and snippets.

@ibalashov
Created April 1, 2015 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ibalashov/381f323ca976c3364c84 to your computer and use it in GitHub Desktop.
Save ibalashov/381f323ca976c3364c84 to your computer and use it in GitHub Desktop.
Akka ping-pong perf test with minimized FJ threads
package akkatest
import akka.actor._
import akka.routing.RoundRobinPool
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.scalalogging.slf4j.LazyLogging
import iow.common.util.NetUtil
import net.iponweb.kafka.util.MiscUtil
import scala.util.Try
object TestSystem extends LazyLogging {
case object PingObj
case object PongObj
class ActorA extends Actor with MeasureInBatches {
override def receive: Receive = {
case PingObj =>
sender() ! PongObj
measure()
}
}
class ActorB extends Actor with MeasureInBatches {
override def receive: Receive = {
case _ => measure()
}
}
def main(args: Array[String]): Unit = {
val port: Int = args(0).toInt
val localIpAddress: String = NetUtil.localIpAddress
val commonRemotingConfig: Config = ConfigFactory.parseString(
s"""
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.port = $port
|akka.remote.netty.tcp.hostname = $localIpAddress
|network-dispatcher {
| type = Dispatcher
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 1
| parallelism-max = 1
| }
| throughput = 1000
|}
|akka.remote.use-dispatcher = "network-dispatcher"
|akka.remote.use-dispatcher-for-io = "network-dispatcher"
|akka.remote.netty.tcp.receive-buffer-size = 1048576b
|akka.remote.netty.tcp.send-buffer-size = 1048576b
""".stripMargin)
if (args.length == 1) {
val config: Config = ConfigFactory.parseString(
s"""|
|server-dispatcher {
| type = Dispatcher
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 1
| parallelism-max = 1
| }
| throughput = 1000
|}
""".stripMargin)
ActorSystem("systemA", config.withFallback(commonRemotingConfig))
.actorOf(RoundRobinPool(Runtime.getRuntime.availableProcessors()) // 4 cores
.props(Props(classOf[ActorA])
.withDispatcher("server-dispatcher")),
"actorA")
} else if (args.length >= 2) {
val config: Config = ConfigFactory.parseString(
s"""
|client-dispatcher {
| type = Dispatcher
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 1
| parallelism-max = 1
| }
| throughput = 1000
|}
""".stripMargin)
val remoteActorPath: String = args(1)
val actorSystem: ActorSystem = ActorSystem("systemB", config.withFallback(commonRemotingConfig))
val actorAsel: ActorSelection = actorSystem.actorSelection(remoteActorPath)
def sendFromActor(actor: ActorRef, messages: Int): Unit = {
MiscUtil.timeMs {
(1 to messages) foreach { x =>
actorAsel.tell(PingObj, actor)
}
logger.info(s"Done sending: $messages messages from: $actor to: $actorAsel")
}
}
val messages: Int = Try {
args(2).toInt
}.getOrElse(1000000)
sendFromActor(actorSystem.actorOf(RoundRobinPool(Runtime.getRuntime.availableProcessors()).props(
Props(classOf[ActorB]).withDispatcher("client-dispatcher"))), messages)
}
}
}
trait MeasureInBatches extends LazyLogging {
var batchSz: Long = 10000
var cursor: Long = 0
var lastTs: Option[Long] = None
def measure() = {
if (cursor % batchSz == 0) {
if (lastTs.isDefined) {
val ms: Some[Long] = getCurrentTimeMs
val tsDiff: Long = ms.get - lastTs.get
val throughput: Double = batchSz.toDouble / tsDiff * 1000
logger.info(s"$this $cursor: $cursor, last batch throughput: $throughput")
}
lastTs = getCurrentTimeMs
}
cursor += 1
}
def getCurrentTimeMs: Some[Long] = {
Some(System.nanoTime() / 1000000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment