Logs the mailbox size when exceeding the configured limit. Implemented in Scala and Java. Copy one of them to your project and define the configuration. This code is licensed under the Apache 2 license.
/** | |
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.contrib.mailbox | |
import scala.concurrent.duration._ | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.atomic.AtomicLong | |
import com.typesafe.config.Config | |
import akka.actor.{ ActorContext, ActorRef, ActorSystem, ExtendedActorSystem } | |
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedMailbox, UnboundedQueueBasedMessageQueue, ProducesMessageQueue } | |
import akka.event.Logging | |
/** | |
* Logs the mailbox size when exceeding the configured limit. It logs at most once per second | |
* when the messages are enqueued or dequeued. | |
* | |
* Configuration: | |
* <pre> | |
* akka.actor.default-mailbox { | |
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType | |
* size-limit = 20 | |
* } | |
* </pre> | |
*/ | |
class LoggingMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType with ProducesMessageQueue[UnboundedMailbox.MessageQueue] { | |
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { | |
case (Some(o), Some(s)) => | |
val sizeLimit = config.getInt("size-limit") | |
val mailbox = new LoggingMailbox(o, s, sizeLimit) | |
mailbox | |
case _ => throw new IllegalArgumentException("no mailbox owner or system given") | |
} | |
} | |
class LoggingMailbox(owner: ActorRef, system: ActorSystem, sizeLimit: Int) | |
extends UnboundedMailbox.MessageQueue { | |
private val interval = 1000000000L // 1 s, in nanoseconds | |
private lazy val log = Logging(system, classOf[LoggingMailbox]) | |
private val path = owner.path.toString | |
@volatile private var logTime: Long = System.nanoTime() | |
private val queueSize = new AtomicInteger | |
private val dequeueCount = new AtomicInteger | |
override def dequeue(): Envelope = { | |
val x = super.dequeue() | |
if (x ne null) { | |
val size = queueSize.decrementAndGet() | |
dequeueCount.incrementAndGet() | |
logSize(size) | |
} | |
x | |
} | |
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { | |
super.enqueue(receiver, handle) | |
val size = queueSize.incrementAndGet() | |
logSize(size) | |
} | |
def logSize(size: Int): Unit = | |
if (size >= sizeLimit) { | |
val now = System.nanoTime() | |
if (now - logTime > interval) { | |
val msgPerSecond = dequeueCount.get.toDouble / ((now - logTime).toDouble / 1000000000L) | |
logTime = now | |
dequeueCount.set(0) | |
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, f"$msgPerSecond%2.2f") | |
} | |
} | |
override def numberOfMessages: Int = queueSize.get | |
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { | |
super.cleanUp(owner, deadLetters) | |
} | |
} |
/** | |
* Copyright (C) 2014 Typesafe <http://typesafe.com/> | |
*/ | |
package akka.contrib.mailbox; | |
import com.typesafe.config.Config; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import scala.Option; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.dispatch.Envelope; | |
import akka.dispatch.MailboxType; | |
import akka.dispatch.MessageQueue; | |
import akka.dispatch.ProducesMessageQueue; | |
import akka.dispatch.UnboundedMailbox; | |
import akka.event.Logging; | |
import akka.event.LoggingAdapter; | |
/** | |
* Logs the mailbox size when exceeding the configured limit. It logs at most | |
* once per second when the messages are enqueued or dequeued. | |
* | |
* Configuration: | |
* | |
* <pre> | |
* akka.actor.default-mailbox { | |
* mailbox-type = akka.contrib.mailbox.LoggingMailboxType | |
* size-limit = 20 | |
* } | |
* </pre> | |
*/ | |
public class LoggingMailboxType implements MailboxType, ProducesMessageQueue<UnboundedMailbox.MessageQueue> { | |
private final Config config; | |
public LoggingMailboxType(ActorSystem.Settings settings, Config config) { | |
this.config = config; | |
} | |
@Override | |
public MessageQueue create(Option<ActorRef> owner, Option<ActorSystem> system) { | |
if (owner.isEmpty() || system.isEmpty()) | |
throw new IllegalArgumentException("no mailbox owner or system given"); | |
int sizeLimit = config.getInt("size-limit"); | |
return new LoggingMailbox(owner.get(), system.get(), sizeLimit); | |
} | |
static class LoggingMailbox implements MessageQueue { | |
private final Queue<Envelope> queue = new ConcurrentLinkedQueue<Envelope>(); | |
private final int sizeLimit; | |
private final LoggingAdapter log; | |
private final long interval = 1000000000L; // 1 s, in nanoseconds | |
private final String path; | |
volatile private long logTime = System.nanoTime(); | |
private final AtomicInteger queueSize = new AtomicInteger(); | |
private final AtomicInteger dequeueCount = new AtomicInteger(); | |
LoggingMailbox(ActorRef owner, ActorSystem system, int sizeLimit) { | |
this.path = owner.path().toString(); | |
this.sizeLimit = sizeLimit; | |
this.log = Logging.getLogger(system, LoggingMailbox.class); | |
} | |
@Override | |
public Envelope dequeue() { | |
Envelope x = queue.poll(); | |
if (x != null) { | |
int size = queueSize.decrementAndGet(); | |
dequeueCount.incrementAndGet(); | |
logSize(size); | |
} | |
return x; | |
} | |
@Override | |
public void enqueue(ActorRef receiver, Envelope handle) { | |
queue.offer(handle); | |
int size = queueSize.incrementAndGet(); | |
logSize(size); | |
} | |
private void logSize(int size) { | |
if (size >= sizeLimit) { | |
long now = System.nanoTime(); | |
if (now - logTime > interval) { | |
double msgPerSecond = ((double) dequeueCount.get()) / (((double) (now - logTime)) / 1000000000L); | |
logTime = now; | |
dequeueCount.set(0); | |
log.info("Mailbox size for [{}] is [{}], processing [{}] msg/s", path, size, | |
String.format("%2.2f", msgPerSecond)); | |
} | |
} | |
} | |
@Override | |
public int numberOfMessages() { | |
return queueSize.get(); | |
} | |
@Override | |
public boolean hasMessages() { | |
return !queue.isEmpty(); | |
} | |
@Override | |
public void cleanUp(ActorRef owner, MessageQueue deadLetters) { | |
for (Envelope handle : queue) { | |
deadLetters.enqueue(owner, handle); | |
} | |
} | |
} | |
} |
This comment has been minimized.
This comment has been minimized.
I tried it with a project using latest Akka 2.6.9 and it is working fine. How do you configure it? I only pasted the code into the project and added config
|
This comment has been minimized.
This comment has been minimized.
Thanks @patriknw ! |
This comment has been minimized.
This comment has been minimized.
Hi, I wondered why |
This comment has been minimized.
This comment has been minimized.
@TJC I think you are right, no good reason for that override |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Hi @patriknw !
Thanks for the code.
Could you please include in the Gist how to avoid this exception?
akka.actor.ActorInitializationException: Actor [Actor[akka://Akka/]] requires mailbox type [interface akka.dispatch.UnboundedMessageQueueSemantics] got [akka.contrib.mailbox.LoggingMailboxType$LoggingMailbox]