Skip to content

Instantly share code, notes, and snippets.

@karthik20522
Last active August 17, 2019 20:06
Show Gist options
  • Save karthik20522/0725b842fcccce32e01b84c45c5270b7 to your computer and use it in GitHub Desktop.
Save karthik20522/0725b842fcccce32e01b84c45c5270b7 to your computer and use it in GitHub Desktop.
AkkaSampling
package akka.actor
import java.io.BufferedOutputStream
import java.io.FileOutputStream
import akka.dispatch._
import java.util.concurrent.ExecutorService
import scala.concurrent.forkjoin.ForkJoinPool
import java.io.{OutputStream, Writer, OutputStreamWriter}
import TraversalHelper._
import scala.collection.mutable.SortedSet
import scala.math.Ordering
object AkkaSampling {
case class Settings(samplingPeriod: Int = 5,
maxChildren: Int = Int.MaxValue,
topSettings: TopSettings = TopSettings())
case class TopSettings(nrOfMsgThreshold: Int = -1, maxActorsToShow: Int = Int.MaxValue)
def print(as: ActorSystem, out: OutputStream = System.out)(implicit settings: Settings = Settings()) = {
val writer = new OutputStreamWriter(out)
doAsync(settings.samplingPeriod) {
doActors(as, writer)
doDispatchers(as, writer)
writer.flush
}
}
def doActors(as: ActorSystem, writer: Writer)(implicit settings: Settings) = {
implicit val topActorsOrdering = Ordering.fromLessThan[ActorRefWithCell](
(a1, a2) => a1.underlying.numberOfMessages >= a2.underlying.numberOfMessages)
val topActors = SortedSet()
def printTop = {
writer.write("\n=Top actors:=\n")
topActors.take(settings.topSettings.maxActorsToShow).foreach { actor =>
writer.write(actor.path + ": " + actor.underlying.numberOfMessages + "\n")
}
}
val ah: ActorHandler = (actor, depth) => {
val nrOfMessages = actor.underlying.numberOfMessages
writer.write(" " * depth + actor.path + ": nrOfMessages = " + nrOfMessages + "\n")
if (nrOfMessages >= settings.topSettings.nrOfMsgThreshold)
topActors += actor
}
writer.write("\n==Actors:==\n")
traverseActorTree(as)(ah)
printTop
}
def doDispatchers(as: ActorSystem, writer: Writer)(implicit settings: Settings) = {
val dh: DispatcherHandler = {
case (dispatcher, Some(pool: ForkJoinPool)) =>
writer.write(dispatcher.id + ": activeCount = " + pool.getActiveThreadCount +
", poolSize = " + pool.getPoolSize + "\n")
case (dispatcher, Some(execSrv)) => writer.write(dispatcher + " " + execSrv + "\n")
case (dispatcher, None) => writer.write(dispatcher.toString + "\n")
}
writer.write("\n==Dispatchers:==\n")
traverseDispatchers(as)(dh)
}
def doAsync(period: Int)(code: => Unit) = {
new Thread {
override def run = {
while (true) {
code
Thread.sleep(period * 1000)
}
}
}.start
}
}
package akka.actor
import akka.dispatch._
import java.util.concurrent.ExecutorService
import scala.concurrent.forkjoin.ForkJoinPool
object TraversalHelper {
type Depth = Int
type ActorHandler = (ActorRefWithCell, Depth) => Unit
type DispatcherHandler = (MessageDispatcher, Option[ExecutorService]) => Unit
def traverseActorTree(as: ActorSystem)(handler: ActorHandler) = {
def traverseSubTree(node: ActorRefWithCell, depth: Depth): Unit = {
handler(node, depth)
node.underlying.childrenRefs.children foreach { actorRef =>
traverseSubTree(actorRef.asInstanceOf[ActorRefWithCell], depth + 1)
}
}
val system = as.asInstanceOf[ActorSystemImpl]
traverseSubTree(system.guardian, 0)
}
object Reflection { // helpers to access private fields via Java Reflection
val dispConfiguratorsField = {
val f = classOf[Dispatchers].getDeclaredField("dispatcherConfigurators")
f.setAccessible(true)
f
}
val dispExecSrvGetter = {
val f = classOf[Dispatcher].getDeclaredMethod("executorService")
f.setAccessible(true)
f
}
}
def traverseDispatchers(as: ActorSystem)(handler: DispatcherHandler) = {
import Reflection._
def traverseDispatcher(dispatcher: MessageDispatcher) = {
def extractExecSrv = dispatcher match {
// default akka dispatcher
case md: Dispatcher => Some(
dispExecSrvGetter.invoke(dispatcher)
.asInstanceOf[ExecutorServiceDelegate]
.executor)
case _ => None
}
handler(dispatcher, extractExecSrv)
}
import scala.collection.JavaConverters._
val confMap = dispConfiguratorsField.get(as.dispatchers)
.asInstanceOf[java.util.concurrent.ConcurrentMap[String, MessageDispatcherConfigurator]]
.asScala
confMap.values foreach { dispConf => traverseDispatcher(dispConf.dispatcher) }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment