Skip to content

Instantly share code, notes, and snippets.

@aumgn
Created May 27, 2018 08:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aumgn/7c654dab801270f7da1520456351cd32 to your computer and use it in GitHub Desktop.
Save aumgn/7c654dab801270f7da1520456351cd32 to your computer and use it in GitHub Desktop.
JfrMailboxType

JfrMailboxType

package whisk.common
import java.lang.management.ManagementFactory
import java.util.concurrent.ConcurrentLinkedQueue
import javax.management.ObjectName
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch._
import akka.event.LoggerMessageQueueSemantics
import com.oracle.jrockit.jfr._
case object EventTokens {
val producer = new Producer("Actor Events", "", "http://example.org")
val actorPath = new DynamicValue(
"path",
"Name of actor",
"<no-description>",
ContentType.None,
classOf[String])
val queue = new DynamicValue(
"queue",
"Size of actor's queue",
"<no-description>",
ContentType.None,
classOf[Int])
val delay = new DynamicValue(
"delay",
"Latency of message in runqueue",
"<no-description>",
ContentType.Nanos,
classOf[Long]
)
val enqueue = producer.createDynamicInstantEvent(
"whisk/core/invoker",
"Actor Enqueue",
"whisk/actor/enqueue",
false,
true,
actorPath,
queue)
val dequeue = producer.createDynamicInstantEvent(
"whisk/core/invoker",
"Actor Dequeue",
"whisk/actor/dequeue",
false,
true,
actorPath,
queue,
delay)
producer.register()
}
class JfrMailboxType(settings: ActorSystem.Settings, config: com.typesafe.config.Config)
extends MailboxType
with ProducesMessageQueue[JfrMessageQueue] {
override def create(owner: Option[ActorRef], systemOpt: Option[ActorSystem]): MessageQueue = {
return owner match {
case Some(receiver) => doCreate(receiver)
case _ => throw new RuntimeException()
}
}
def doCreate(receiver: ActorRef): MessageQueue = {
val mq = new JfrMessageQueue(receiver)
val server = ManagementFactory.getPlatformMBeanServer
val mxDomain = "whisk.actors:type=" + receiver.path.toStringWithoutAddress
server.registerMBean(mq, ObjectName.getInstance(mxDomain))
mq
}
}
trait JfrMessageQueueMBean {
def getQueueSize(): Double
}
class JfrMessageQueue(val receiver: ActorRef) extends MessageQueue
with UnboundedMessageQueueSemantics
with LoggerMessageQueueSemantics
with JfrMessageQueueMBean {
case class JfrEnvelope(handle: Envelope, timestamp: Long)
val queue = new ConcurrentLinkedQueue[JfrEnvelope]()
override def getQueueSize(): Double = queue.size()
override def numberOfMessages: Int = queue.size()
override def hasMessages: Boolean = !queue.isEmpty
override def enqueue(r: ActorRef, envelope: Envelope): Unit = {
val event = EventTokens.enqueue.newInstantEvent()
event.setValue("path", receiver.path)
event.setValue("queue", queue.size)
event.commit()
queue.add(JfrEnvelope(envelope, System.nanoTime()))
}
override def dequeue(): Envelope = {
val envelope = queue.poll()
if (envelope == null) {
return null
}
val delay = System.nanoTime() - envelope.timestamp
val event = EventTokens.dequeue.newInstantEvent()
event.setValue("path", receiver.path)
event.setValue("queue", queue.size)
event.setValue("delay", delay)
event.commit()
envelope.handle
}
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
while (!queue.isEmpty) {
deadLetters.enqueue(owner, queue.poll().handle)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment