Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Logs the mailbox size when exceeding the configured limit. Implemented in Scala and Java. Copy one of them to your project and define the configuration. This code is licensed under the Apache 2 license.
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.mailbox
import scala.concurrent.duration._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import com.typesafe.config.Config
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem }
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue, ProducesMessageQueue }
import akka.event.Logging
/**
* Logs the mailbox size when exceeding the configured limit. It logs at most once per second
* when the messages are enqueued or dequeued.
*
* Configuration:
* <pre>
* akka.actor.default-mailbox {
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType
* size-limit = 20
* }
* </pre>
*/
class LoggingMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s)) =>
val sizeLimit = config.getInt("size-limit")
val mailbox = new LoggingMailbox(o, s, sizeLimit)
mailbox
case _ => throw new IllegalArgumentException("no mailbox owner or system given")
}
}
class LoggingMailbox(owner: ActorRef, system: ActorSystem, sizeLimit: Int)
extends UnboundedMailbox.MessageQueue {
private val interval = 1000000000L // 1 s, in nanoseconds
private lazy val log = Logging(system, classOf[LoggingMailbox])
private val path = owner.path.toString
@volatile private var logTime: Long = System.nanoTime()
private val queueSize = new AtomicInteger
private val dequeueCount = new AtomicInteger
override def dequeue(): Envelope = {
val x = super.dequeue()
if (x ne null) {
val size = queueSize.decrementAndGet()
dequeueCount.incrementAndGet()
logSize(size)
}
x
}
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
super.enqueue(receiver, handle)
val size = queueSize.incrementAndGet()
logSize(size)
}
def logSize(size: Int): Unit =
if (size >= sizeLimit) {
val now = System.nanoTime()
if (now - logTime > interval) {
val msgPerSecond = dequeueCount.get.toDouble / ((now - logTime).toDouble / 1000000000L)
logTime = now
dequeueCount.set(0)
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, f"$msgPerSecond%2.2f")
}
}
override def numberOfMessages: Int = queueSize.get
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
super.cleanUp(owner, deadLetters)
}
}
/**
* Copyright (C) 2014 Typesafe <http://typesafe.com/>
*/
package akka.contrib.mailbox;
import com.typesafe.config.Config;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import scala.Option;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Envelope;
import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import akka.dispatch.UnboundedMailbox;
import akka.event.Logging;
import akka.event.LoggingAdapter;
/**
* Logs the mailbox size when exceeding the configured limit. It logs at most
* once per second when the messages are enqueued or dequeued.
*
* Configuration:
*
* <pre>
* akka.actor.default-mailbox {
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType
* size-limit = 20
* }
* </pre>
*/
public class LoggingMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> {
private final Config config;
public LoggingMailboxType(ActorSystem.Settings settings, Config config) {
this.config = config;
}
@Override
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) {
if (owner.isEmpty() || system.isEmpty())
throw new IllegalArgumentException("no mailbox owner or system given");
int sizeLimit = config.getInt("size-limit");
return new LoggingMailbox(owner.get(), system.get(), sizeLimit);
}
static class LoggingMailbox implements MessageQueue {
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>();
private final int sizeLimit;
private final LoggingAdapter log;
private final long interval = 1000000000L; // 1 s, in nanoseconds
private final String path;
volatile private long logTime = System.nanoTime();
private final AtomicInteger queueSize = new AtomicInteger();
private final AtomicInteger dequeueCount = new AtomicInteger();
LoggingMailbox(ActorRef owner, ActorSystem system, int sizeLimit) {
this.path = owner.path().toString();
this.sizeLimit = sizeLimit;
this.log = Logging.getLogger(system, LoggingMailbox.class);
}
@Override
public Envelope dequeue() {
Envelope x = queue.poll();
if (x != null) {
int size = queueSize.decrementAndGet();
dequeueCount.incrementAndGet();
logSize(size);
}
return x;
}
@Override
public void enqueue(ActorRef receiver, Envelope handle) {
queue.offer(handle);
int size = queueSize.incrementAndGet();
logSize(size);
}
private void logSize(int size) {
if (size >= sizeLimit) {
long now = System.nanoTime();
if (now - logTime > interval) {
double msgPerSecond = ((double) dequeueCount.get()) / (((double) (now - logTime)) / 1000000000L);
logTime = now;
dequeueCount.set(0);
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size,
String.format("%2.2f", msgPerSecond));
}
}
}
@Override
public int numberOfMessages() {
return queueSize.get();
}
@Override
public boolean hasMessages() {
return !queue.isEmpty();
}
@Override
public void cleanUp(ActorRef owner, MessageQueue deadLetters) {
for (Envelope handle : queue) {
deadLetters.enqueue(owner, handle);
}
}
}
}
@AnatoliiStepaniuk

This comment has been minimized.

Copy link

@AnatoliiStepaniuk AnatoliiStepaniuk commented Sep 11, 2020

Hi @patriknw !
Thanks for the code.
Could you please include in the Gist how to avoid this exception?

akka.actor.ActorInitializationException: Actor [Actor[akka://Akka/]] requires mailbox type [interface akka.dispatch.UnboundedMessageQueueSemantics] got [akka.contrib.mailbox.LoggingMailboxType$LoggingMailbox]

@patriknw

This comment has been minimized.

Copy link
Owner Author

@patriknw patriknw commented Sep 11, 2020

I tried it with a project using latest Akka 2.6.9 and it is working fine. How do you configure it? I only pasted the code into the project and added config

akka.actor.default-mailbox {
  mailbox-type = akka.contrib.mailbox.LoggingMailboxType
  size-limit = 2
}
@AnatoliiStepaniuk

This comment has been minimized.

Copy link

@AnatoliiStepaniuk AnatoliiStepaniuk commented Sep 11, 2020

Thanks @patriknw !
I've resolved the issue by adding akka.dispatch.UnboundedMessageQueueSemantics to
static class LoggingMailbox implements MessageQueue, akka.dispatch.UnboundedMessageQueueSemantics

@TJC

This comment has been minimized.

Copy link

@TJC TJC commented Feb 24, 2021

Hi, I wondered why cleanUp is overridden in the Scala version of the code? It looks like all it does is call super.cleanUp, so wouldn't that be the same effect as just not overriding the function?

@patriknw

This comment has been minimized.

Copy link
Owner Author

@patriknw patriknw commented Feb 24, 2021

@TJC I think you are right, no good reason for that override

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment