Skip to content

Instantly share code, notes, and snippets.

@patriknw
Last active January 5, 2023 08:12
Show Gist options
  • Star 42 You must be signed in to star a gist
  • Fork 11 You must be signed in to fork a gist
  • Save patriknw/5946678 to your computer and use it in GitHub Desktop.
Save patriknw/5946678 to your computer and use it in GitHub Desktop.
Logs the mailbox size when exceeding the configured limit. Implemented in Scala and Java. Copy one of them to your project and define the configuration. This code is licensed under the Apache 2 license.
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.mailbox
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import com.typesafe.config.Config
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue, ProducesMessageQueue }
import akka.event.Logging
/**
* Logs the mailbox size when exceeding the configured limit. It logs at most once per second
* when the messages are enqueued or dequeued.
*
* Configuration:
* <pre>
* akka.actor.default-mailbox {
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType
* size-limit = 20
* }
* </pre>
*/
class LoggingMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s)) =>
val sizeLimit = config.getInt("size-limit")
val mailbox = new LoggingMailbox(o, s, sizeLimit)
mailbox
case _ => throw new IllegalArgumentException("no mailbox owner or system given")
}
}
class LoggingMailbox(owner: ActorRef, system: ActorSystem, sizeLimit: Int)
extends UnboundedMailbox.MessageQueue {
private val interval = 1000000000L // 1 s, in nanoseconds
private lazy val log = Logging(system, classOf[LoggingMailbox])
private val path = owner.path.toString
@volatile private var logTime: Long = System.nanoTime()
private val queueSize = new AtomicInteger
private val dequeueCount = new AtomicInteger
override def dequeue(): Envelope = {
val x = super.dequeue()
if (x ne null) {
val size = queueSize.decrementAndGet()
dequeueCount.incrementAndGet()
logSize(size)
}
x
}
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
val size = queueSize.incrementAndGet()
logSize(size)
}
def logSize(size: Int): Unit =
if (size >= sizeLimit) {
val now = System.nanoTime()
if (now - logTime > interval) {
val msgPerSecond = dequeueCount.get.toDouble / ((now - logTime).toDouble / 1000000000L)
logTime = now
dequeueCount.set(0)
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, f"$msgPerSecond%2.2f")
}
}
override def numberOfMessages: Int = queueSize.get
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
super.cleanUp(owner, deadLetters)
}
}
/**
* Copyright (C) 2014 Typesafe <http://typesafe.com/>
*/
package akka.contrib.mailbox;
import com.typesafe.config.Config;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import scala.Option;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import akka.dispatch.UnboundedMailbox;
import akka.event.Logging;
import akka.event.LoggingAdapter;
/**
* Logs the mailbox size when exceeding the configured limit. It logs at most
* once per second when the messages are enqueued or dequeued.
*
* Configuration:
*
* <pre>
* akka.actor.default-mailbox {
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType
* size-limit = 20
* }
* </pre>
*/
public class LoggingMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> {
private final Config config;
public LoggingMailboxType(ActorSystem.Settings settings, Config config) {
this.config = config;
}
@Override
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
if (owner.isEmpty() || system.isEmpty())
throw new IllegalArgumentException("no mailbox owner or system given");
int sizeLimit = config.getInt("size-limit");
return new LoggingMailbox(owner.get(), system.get(), sizeLimit);
}
static class LoggingMailbox implements MessageQueue {
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
private final int sizeLimit;
private final LoggingAdapter log;
private final long interval = 1000000000L; // 1 s, in nanoseconds
private final String path;
volatile private long logTime = System.nanoTime();
private final AtomicInteger queueSize = new AtomicInteger();
private final AtomicInteger dequeueCount = new AtomicInteger();
LoggingMailbox(ActorRef owner, ActorSystem system, int sizeLimit) {
this.path = owner.path().toString();
this.sizeLimit = sizeLimit;
this.log = Logging.getLogger(system, LoggingMailbox.class);
}
@Override
public Envelope dequeue() {
Envelope x = queue.poll();
if (x != null) {
int size = queueSize.decrementAndGet();
dequeueCount.incrementAndGet();
logSize(size);
}
return x;
}
@Override
public void enqueue(ActorRef receiver, Envelope handle) {
queue.offer(handle);
int size = queueSize.incrementAndGet();
logSize(size);
}
private void logSize(int size) {
if (size >= sizeLimit) {
long now = System.nanoTime();
if (now - logTime > interval) {
double msgPerSecond = ((double) dequeueCount.get()) / (((double) (now - logTime)) / 1000000000L);
logTime = now;
dequeueCount.set(0);
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size,
String.format("%2.2f", msgPerSecond));
}
}
}
@Override
public int numberOfMessages() {
return queueSize.get();
}
@Override
public boolean hasMessages() {
return !queue.isEmpty();
}
@Override
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
for (Envelope handle : queue) {
deadLetters.enqueue(owner, handle);
}
}
}
}
@TJC
Copy link

TJC commented Feb 24, 2021

Hi, I wondered why cleanUp is overridden in the Scala version of the code? It looks like all it does is call super.cleanUp, so wouldn't that be the same effect as just not overriding the function?

@patriknw
Copy link
Author

@TJC I think you are right, no good reason for that override

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment