Created
May 27, 2018 08:04
-
-
Save aumgn/7c654dab801270f7da1520456351cd32 to your computer and use it in GitHub Desktop.
JfrMailboxType
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package whisk.common | |
import java.lang.management.ManagementFactory | |
import java.util.concurrent.ConcurrentLinkedQueue | |
import javax.management.ObjectName | |
import akka.actor.{ActorRef, ActorSystem} | |
import akka.dispatch._ | |
import akka.event.LoggerMessageQueueSemantics | |
import com.oracle.jrockit.jfr._ | |
case object EventTokens { | |
val producer = new Producer("Actor Events", "", "http://example.org") | |
val actorPath = new DynamicValue( | |
"path", | |
"Name of actor", | |
"<no-description>", | |
ContentType.None, | |
classOf[String]) | |
val queue = new DynamicValue( | |
"queue", | |
"Size of actor's queue", | |
"<no-description>", | |
ContentType.None, | |
classOf[Int]) | |
val delay = new DynamicValue( | |
"delay", | |
"Latency of message in runqueue", | |
"<no-description>", | |
ContentType.Nanos, | |
classOf[Long] | |
) | |
val enqueue = producer.createDynamicInstantEvent( | |
"whisk/core/invoker", | |
"Actor Enqueue", | |
"whisk/actor/enqueue", | |
false, | |
true, | |
actorPath, | |
queue) | |
val dequeue = producer.createDynamicInstantEvent( | |
"whisk/core/invoker", | |
"Actor Dequeue", | |
"whisk/actor/dequeue", | |
false, | |
true, | |
actorPath, | |
queue, | |
delay) | |
producer.register() | |
} | |
class JfrMailboxType(settings: ActorSystem.Settings, config: com.typesafe.config.Config) | |
extends MailboxType | |
with ProducesMessageQueue[JfrMessageQueue] { | |
override def create(owner: Option[ActorRef], systemOpt: Option[ActorSystem]): MessageQueue = { | |
return owner match { | |
case Some(receiver) => doCreate(receiver) | |
case _ => throw new RuntimeException() | |
} | |
} | |
def doCreate(receiver: ActorRef): MessageQueue = { | |
val mq = new JfrMessageQueue(receiver) | |
val server = ManagementFactory.getPlatformMBeanServer | |
val mxDomain = "whisk.actors:type=" + receiver.path.toStringWithoutAddress | |
server.registerMBean(mq, ObjectName.getInstance(mxDomain)) | |
mq | |
} | |
} | |
trait JfrMessageQueueMBean { | |
def getQueueSize(): Double | |
} | |
class JfrMessageQueue(val receiver: ActorRef) extends MessageQueue | |
with UnboundedMessageQueueSemantics | |
with LoggerMessageQueueSemantics | |
with JfrMessageQueueMBean { | |
case class JfrEnvelope(handle: Envelope, timestamp: Long) | |
val queue = new ConcurrentLinkedQueue[JfrEnvelope]() | |
override def getQueueSize(): Double = queue.size() | |
override def numberOfMessages: Int = queue.size() | |
override def hasMessages: Boolean = !queue.isEmpty | |
override def enqueue(r: ActorRef, envelope: Envelope): Unit = { | |
val event = EventTokens.enqueue.newInstantEvent() | |
event.setValue("path", receiver.path) | |
event.setValue("queue", queue.size) | |
event.commit() | |
queue.add(JfrEnvelope(envelope, System.nanoTime())) | |
} | |
override def dequeue(): Envelope = { | |
val envelope = queue.poll() | |
if (envelope == null) { | |
return null | |
} | |
val delay = System.nanoTime() - envelope.timestamp | |
val event = EventTokens.dequeue.newInstantEvent() | |
event.setValue("path", receiver.path) | |
event.setValue("queue", queue.size) | |
event.setValue("delay", delay) | |
event.commit() | |
envelope.handle | |
} | |
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { | |
while (!queue.isEmpty) { | |
deadLetters.enqueue(owner, queue.poll().handle) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment