Skip to content

Instantly share code, notes, and snippets.

@RichardBradley
Last active August 29, 2015 14:16
Show Gist options
  • Save RichardBradley/fc0cc3d312d985e65794 to your computer and use it in GitHub Desktop.
Save RichardBradley/fc0cc3d312d985e65794 to your computer and use it in GitHub Desktop.
Akka drops error logs on ActorSystem guardian death - fix
.idea
myApp.log
target
package akka
import akka._
import akka.actor._
import akka.event.Logging._
import akka.event.LoggingBus
import com.typesafe.config.{ConfigValueFactory, ConfigFactory, Config}
import scala.collection.JavaConversions._
object Application {
def main(args: Array[String]): Unit = {
val system = ActorSystem("test", config)
setUpNonActorBasedLogging(system)
println("Wait a bit for everything to settle down")
Thread.sleep(500)
println("stuff the logger's mailbox")
(1 to 10000).foreach { i =>
system.log.debug("mailbox stuffing " + i)
}
println("simulate a failed assertion somewhere in the ActorSystem")
system.actorOf(Props(new FailingAssertionActor())) ! "message"
println("Wait a bit for everything to settle down, and things to flush")
Thread.sleep(500)
assert(system.isTerminated)
}
/**
* Remove the actor based logging system, and change the LoggingBus to hand
* all messages to SLF4J as soon as they arrive
*/
def setUpNonActorBasedLogging(system: ActorSystem): Unit = {
val logBus: LoggingBus = system.eventStream
val newLoggers = List(new NonActorBasedSlf4jLogger(system))
val loggersField = logBus.getClass.getDeclaredField("akka$event$LoggingBus$$loggers")
loggersField.setAccessible(true)
val oldLoggers = loggersField.get(logBus).asInstanceOf[Seq[ActorRef]]
oldLoggers.foreach(logBus.unsubscribe(_))
loggersField.set(logBus, newLoggers)
newLoggers
.foreach(logger =>
AllLogLevels.foreach(level =>
logBus.subscribe(logger, classFor(level))))
}
class FailingAssertionActor
extends Actor
{
override def receive: Receive = {
case x =>
assert(false, "it is important that this gets logged")
}
}
val config = {
System.setProperty("logback.configurationFile", "logback.xml")
ConfigFactory.load()
.withValue("akka.loglevel", ConfigValueFactory.fromAnyRef("DEBUG"))
}
}
scalaVersion := "2.11.4"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-experimental" % "1.0-M3",
"com.typesafe.akka" %% "akka-http-testkit-experimental" % "1.0-M3" % "test",
"com.typesafe.akka" %% "akka-slf4j" % "2.3.7",
"ch.qos.logback" % "logback-classic" % "1.1.2")
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>myApp.log</file>
<encoder>
<pattern>%date %level [%thread] %logger{10} [%file:%line] %msg%n</pattern>
</encoder>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="FILE" />
<queueSize>1024</queueSize>
<includeCallerData>false</includeCallerData>
</appender>
<root level="all">
<appender-ref ref="async" />
</root>
</configuration>
package akka
import akka.actor.{ActorRef, ActorPath, ActorSystem}
import akka.event.Logging._
import akka.event.slf4j.Logger
import akka.util.Helpers
import org.slf4j.MDC
import scala.Error
/**
* This is largely the same as [[akka.event.slf4j.Slf4jLogger]], except that:
* 1. it doesn't have an Actor's message queue, but enqueues log messages with
* log4j immediately
* 2. it's not an Actor, but an [[UnregisteredActorRef]], which doesn't need a
* live ActorSystem to receive messages
* TODO: the "stopDefaultLoggers" method from Logging will remove this actor
* from the bus, but we don't want that -- we want logging to continue
* to work after the ActorSystem is shut down.
*/
private class NonActorBasedSlf4jLogger(system: ActorSystem)
extends UnregisteredActorRef(system) {
val mdcThreadAttributeName = "sourceThread"
val mdcAkkaSourceAttributeName = "akkaSource"
val mdcAkkaTimestamp = "akkaTimestamp"
override def path: ActorPath = ActorPath.fromString("none://NonActorBasedSlf4jLogger")
override def !(msg: Any)(implicit sender: ActorRef): Unit = msg match {
case event @ Error(cause, logSource, logClass, message) ⇒
withMdc(logSource, event) {
cause match {
case Error.NoCause | null ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else null)
case _ ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause)
}
}
case event @ Warning(logSource, logClass, message) ⇒
withMdc(logSource, event) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) }
case event @ Info(logSource, logClass, message) ⇒
withMdc(logSource, event) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) }
case event @ Debug(logSource, logClass, message) ⇒
withMdc(logSource, event) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) }
case InitializeLogger(_) ⇒
sender ! LoggerInitialized
}
@inline
final def withMdc(logSource: String, logEvent: LogEvent)(logStatement: ⇒ Unit) {
MDC.put(mdcAkkaSourceAttributeName, logSource)
MDC.put(mdcThreadAttributeName, logEvent.thread.getName)
MDC.put(mdcAkkaTimestamp, formatTimestamp(logEvent.timestamp))
logEvent.mdc foreach { case (k, v) ⇒ MDC.put(k, String.valueOf(v)) }
try logStatement finally {
MDC.remove(mdcAkkaSourceAttributeName)
MDC.remove(mdcThreadAttributeName)
MDC.remove(mdcAkkaTimestamp)
logEvent.mdc.keys.foreach(k ⇒ MDC.remove(k))
}
}
/**
* Override this method to provide a differently formatted timestamp
* @param timestamp a "currentTimeMillis"-obtained timestamp
* @return the given timestamp as a UTC String
*/
protected def formatTimestamp(timestamp: Long): String =
Helpers.currentTimeMillisToUTCString(timestamp)
}
package akka
import akka.actor._
/**
* An ActorRef which
* - offers the ability to hook caller-side logic into the message path
* - is never registered anywhere, i.e. can be GCed as soon the receiver drops it or is GCed itself
*
* CAUTION: This ActorRef is not addressable from a non-local JVM and it also breaks some otherwise
* valid invariants like `system.actorFor(ref.path.toString).equals(ref)` in the local-only context.
* It should therefore be used only in purely local environments and in consideration of the limitations.
*
* @see based on UnregisteredActorRef in Spray 1.3.1
*/
abstract class UnregisteredActorRef(
system: ActorSystem)
extends MinimalActorRef {
/**
* Subclasses should override this to determine what this pseudo-ActorRef
* should do when sent a message.
*/
override def !(message: Any)(implicit sender: ActorRef): Unit
override def path: ActorPath =
throw new IllegalStateException(
"UnregisteredActorRef does not have an ActorPath")
override def provider: ActorRefProvider =
system.asInstanceOf[ExtendedActorSystem].provider
/**
* (The default 'toString' on actors includes the 'path', which we don't have)
*/
override def toString(): String = s"[$getClass]"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment