Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Logs the mailbox size when exceeding the configured limit. Implemented in Scala and Java. Copy one of them to your project and define the configuration. This code is licensed under the Apache 2 license.
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.mailbox
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import com.typesafe.config.Config
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue, ProducesMessageQueue }
import akka.event.Logging
/**
* Logs the mailbox size when exceeding the configured limit. It logs at most once per second
* when the messages are enqueued or dequeued.
*
* Configuration:
* <pre>
* akka.actor.default-mailbox {
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType
* size-limit = 20
* }
* </pre>
*/
class LoggingMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s)) =>
val sizeLimit = config.getInt("size-limit")
val mailbox = new LoggingMailbox(o, s, sizeLimit)
mailbox
case _ => throw new IllegalArgumentException("no mailbox owner or system given")
}
}
class LoggingMailbox(owner: ActorRef, system: ActorSystem, sizeLimit: Int)
extends UnboundedMailbox.MessageQueue {
private val interval = 1000000000L // 1 s, in nanoseconds
private lazy val log = Logging(system, classOf[LoggingMailbox])
private val path = owner.path.toString
@volatile private var logTime: Long = System.nanoTime()
private val queueSize = new AtomicInteger
private val dequeueCount = new AtomicInteger
override def dequeue(): Envelope = {
val x = super.dequeue()
if (x ne null) {
val size = queueSize.decrementAndGet()
dequeueCount.incrementAndGet()
logSize(size)
}
x
}
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
val size = queueSize.incrementAndGet()
logSize(size)
}
def logSize(size: Int): Unit =
if (size >= sizeLimit) {
val now = System.nanoTime()
if (now - logTime > interval) {
val msgPerSecond = dequeueCount.get.toDouble / ((now - logTime).toDouble / 1000000000L)
logTime = now
dequeueCount.set(0)
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, f"$msgPerSecond%2.2f")
}
}
override def numberOfMessages: Int = queueSize.get
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
super.cleanUp(owner, deadLetters)
}
}
/**
* Copyright (C) 2014 Typesafe <http://typesafe.com/>
*/
package akka.contrib.mailbox;
import com.typesafe.config.Config;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import scala.Option;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import akka.dispatch.UnboundedMailbox;
import akka.event.Logging;
import akka.event.LoggingAdapter;
/**
* Logs the mailbox size when exceeding the configured limit. It logs at most
* once per second when the messages are enqueued or dequeued.
*
* Configuration:
*
* <pre>
* akka.actor.default-mailbox {
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType
* size-limit = 20
* }
* </pre>
*/
public class LoggingMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> {
private final Config config;
public LoggingMailboxType(ActorSystem.Settings settings, Config config) {
this.config = config;
}
@Override
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
if (owner.isEmpty() || system.isEmpty())
throw new IllegalArgumentException("no mailbox owner or system given");
int sizeLimit = config.getInt("size-limit");
return new LoggingMailbox(owner.get(), system.get(), sizeLimit);
}
static class LoggingMailbox implements MessageQueue {
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
private final int sizeLimit;
private final LoggingAdapter log;
private final long interval = 1000000000L; // 1 s, in nanoseconds
private final String path;
volatile private long logTime = System.nanoTime();
private final AtomicInteger queueSize = new AtomicInteger();
private final AtomicInteger dequeueCount = new AtomicInteger();
LoggingMailbox(ActorRef owner, ActorSystem system, int sizeLimit) {
this.path = owner.path().toString();
this.sizeLimit = sizeLimit;
this.log = Logging.getLogger(system, LoggingMailbox.class);
}
@Override
public Envelope dequeue() {
Envelope x = queue.poll();
if (x != null) {
int size = queueSize.decrementAndGet();
dequeueCount.incrementAndGet();
logSize(size);
}
return x;
}
@Override
public void enqueue(ActorRef receiver, Envelope handle) {
queue.offer(handle);
int size = queueSize.incrementAndGet();
logSize(size);
}
private void logSize(int size) {
if (size >= sizeLimit) {
long now = System.nanoTime();
if (now - logTime > interval) {
double msgPerSecond = ((double) dequeueCount.get()) / (((double) (now - logTime)) / 1000000000L);
logTime = now;
dequeueCount.set(0);
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size,
String.format("%2.2f", msgPerSecond));
}
}
}
@Override
public int numberOfMessages() {
return queueSize.get();
}
@Override
public boolean hasMessages() {
return !queue.isEmpty();
}
@Override
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
for (Envelope handle : queue) {
deadLetters.enqueue(owner, handle);
}
}
}
}
@AnatoliiStepaniuk
Copy link

AnatoliiStepaniuk commented Sep 11, 2020

Thanks @patriknw !
I've resolved the issue by adding akka.dispatch.UnboundedMessageQueueSemantics to
static class LoggingMailbox implements MessageQueue, akka.dispatch.UnboundedMessageQueueSemantics

@TJC
Copy link

TJC commented Feb 24, 2021

Hi, I wondered why cleanUp is overridden in the Scala version of the code? It looks like all it does is call super.cleanUp, so wouldn't that be the same effect as just not overriding the function?

@patriknw
Copy link
Author

patriknw commented Feb 24, 2021

@TJC I think you are right, no good reason for that override

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment