Created
November 4, 2011 17:24
-
-
Save mariusae/1339915 to your computer and use it in GitHub Desktop.
Monitors for Finagle.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala | |
index 1df9c58..32663db 100644 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala | |
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala | |
@@ -56,7 +56,7 @@ import org.jboss.netty.channel.socket.nio._ | |
import org.jboss.netty.handler.ssl._ | |
import org.jboss.netty.handler.timeout.IdleStateHandler | |
-import com.twitter.util.{Future, Duration, Throw, Return} | |
+import com.twitter.util.{Future, Duration, Throw, Return, Monitor, NullMonitor} | |
import com.twitter.util.TimeConversions._ | |
import com.twitter.finagle.channel._ | |
@@ -65,6 +65,7 @@ import com.twitter.finagle.pool._ | |
import com.twitter.finagle._ | |
import com.twitter.finagle.service._ | |
import com.twitter.finagle.factory._ | |
+import com.twitter.finagle.filter.MonitorFilter | |
import com.twitter.finagle.stats.{StatsReceiver, RollupStatsReceiver, NullStatsReceiver, GlobalStatsReceiver} | |
import com.twitter.finagle.loadbalancer.{LoadBalancedFactory, LeastQueuedStrategy, HeapBalancer} | |
import com.twitter.finagle.ssl.{Ssl, SslConnectHandler} | |
@@ -150,7 +151,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL | |
private val _readerIdleTimeout : Option[Duration] = None, | |
private val _writerIdleTimeout : Option[Duration] = None, | |
private val _statsReceiver : Option[StatsReceiver] = None, | |
- private val _exceptionReceiver : Option[ClientExceptionReceiverBuilder] = None, | |
+ private val _monitor : Option[String => Monitor] = None, | |
private val _name : Option[String] = Some("client"), | |
private val _sendBufferSize : Option[Int] = None, | |
private val _recvBufferSize : Option[Int] = None, | |
@@ -176,7 +177,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL | |
val connectTimeout = _connectTimeout | |
val timeout = _timeout | |
val statsReceiver = _statsReceiver | |
- val exceptionReceiver = _exceptionReceiver | |
+ val monitor = _monitor | |
val keepAlive = _keepAlive | |
val readerIdleTimeout = _readerIdleTimeout | |
val writerIdleTimeout = _writerIdleTimeout | |
@@ -208,7 +209,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL | |
"readerIdleTimeout" -> Some(_readerIdleTimeout), | |
"writerIdleTimeout" -> Some(_writerIdleTimeout), | |
"statsReceiver" -> _statsReceiver, | |
- "exceptionReceiver" -> _exceptionReceiver, | |
+ "monitor" -> _monitor, | |
"name" -> _name, | |
"hostConnectionCoresize" -> _hostConfig.hostConnectionCoresize, | |
"hostConnectionLimit" -> _hostConfig.hostConnectionLimit, | |
@@ -511,8 +512,8 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv | |
def tracer(tracer: Tracer): This = | |
withConfig(_.copy(_tracerFactory = () => tracer)) | |
- def exceptionReceiver(erFactory: ClientExceptionReceiverBuilder): This = | |
- withConfig(_.copy(_exceptionReceiver = Some(erFactory))) | |
+ def monitor(mFactory: String => Monitor): This = | |
+ withConfig(_.copy(_monitor = Some(mFactory))) | |
/** | |
* Log very detailed debug information to the given logger. | |
@@ -664,7 +665,6 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv | |
) | |
var factory: ServiceFactory[Req, Rep] = null | |
- | |
val bs = buildBootstrap(codec, host) | |
factory = new ChannelServiceFactory[Req, Rep]( | |
bs, prepareService(codec) _, hostStatsReceiver) | |
@@ -682,15 +682,14 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv | |
factory = new FailureAccrualFactory(factory, numFailures, markDeadFor) | |
} | |
- val exceptionFilter = new ExceptionFilter[Req, Rep]( | |
- config.exceptionReceiver map { | |
- _(config.name.get) | |
- } getOrElse { | |
- NullExceptionReceiver | |
- } | |
- ) | |
val statsFilter = new StatsFilter[Req, Rep](hostStatsReceiver) | |
- factory = exceptionFilter andThen statsFilter andThen factory | |
+ val monitorFilter = new MonitorFilter[Req, Rep]({ | |
+ config.monitor map { mf => | |
+ mf(config.name.get) | |
+ } getOrElse NullMonitor | |
+ }) | |
+ | |
+ factory = monitorFilter andThen statsFilter andThen factory | |
factory | |
} | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala | |
index 6e1e8cb..8b2b8b0 100644 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala | |
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala | |
@@ -53,7 +53,7 @@ import org.jboss.netty.channel.socket.nio._ | |
import org.jboss.netty.handler.ssl._ | |
import org.jboss.netty.handler.timeout.ReadTimeoutHandler | |
-import com.twitter.util.Duration | |
+import com.twitter.util.{Duration, Monitor, NullMonitor} | |
import com.twitter.conversions.time._ | |
import com.twitter.finagle._ | |
@@ -121,7 +121,7 @@ object ServerConfig { | |
final case class ServerConfig[Req, Rep, HasCodec, HasBindTo, HasName]( | |
private val _codecFactory: Option[CodecFactory[Req, Rep]#Server] = None, | |
private val _statsReceiver: Option[StatsReceiver] = None, | |
- private val _exceptionReceiver: Option[ServerExceptionReceiverBuilder] = None, | |
+ private val _monitor: Option[(String, SocketAddress) => Monitor] = None, | |
private val _name: Option[String] = None, | |
private val _sendBufferSize: Option[Int] = None, | |
private val _recvBufferSize: Option[Int] = None, | |
@@ -151,7 +151,7 @@ final case class ServerConfig[Req, Rep, HasCodec, HasBindTo, HasName]( | |
*/ | |
val codecFactory = _codecFactory | |
val statsReceiver = _statsReceiver | |
- val exceptionReceiver = _exceptionReceiver | |
+ val monitor = _monitor | |
val name = _name | |
val sendBufferSize = _sendBufferSize | |
val recvBufferSize = _recvBufferSize | |
@@ -175,7 +175,7 @@ final case class ServerConfig[Req, Rep, HasCodec, HasBindTo, HasName]( | |
def toMap = Map( | |
"codecFactory" -> _codecFactory, | |
"statsReceiver" -> _statsReceiver, | |
- "exceptionReceiver" -> _exceptionReceiver, | |
+ "monitor" -> _monitor, | |
"name" -> _name, | |
"sendBufferSize" -> _sendBufferSize, | |
"recvBufferSize" -> _recvBufferSize, | |
@@ -317,8 +317,8 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder]( | |
def writeCompletionTimeout(howlong: Duration): This = | |
withConfig(_.copy(_writeCompletionTimeout = Some(howlong))) | |
- def exceptionReceiver(erFactory: ServerExceptionReceiverBuilder): This = | |
- withConfig(_.copy(_exceptionReceiver = Some(erFactory))) | |
+ def monitor(mFactory: (String, SocketAddress) => Monitor): This = | |
+ withConfig(_.copy(_monitor = Some(mFactory))) | |
def tracerFactory(factory: Tracer.Factory): This = | |
withConfig(_.copy(_tracerFactory = factory)) | |
@@ -495,17 +495,7 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder]( | |
new ProxyService(postponedService flatMap { s => codec.prepareService(s) }) | |
} | |
- // Add the exception service at the bottom layer | |
- // This is not required, but argubably the best style | |
- val exceptionFilter = new ExceptionFilter[Req, Rep] ( | |
- config.exceptionReceiver map { | |
- _(config.name.get, config.bindTo.get) | |
- } getOrElse { | |
- NullExceptionReceiver | |
- } | |
- ) | |
- | |
- service = exceptionFilter andThen service | |
+ // service = exceptionFilter andThen service | |
statsFilter foreach { sf => | |
service = sf andThen service | |
@@ -547,9 +537,16 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder]( | |
// one here. | |
service = (new TracingFilter(tracer)) andThen service | |
+ val monitor = config.monitor map { | |
+ _(config.name.get, config.bindTo.get) | |
+ } getOrElse { | |
+ NullMonitor | |
+ } | |
+ | |
val channelHandler = new ServiceToChannelHandler( | |
service, postponedService, serviceFactory, | |
- scopedOrNullStatsReceiver, Logger.getLogger(getClass.getName)) | |
+ scopedOrNullStatsReceiver, Logger.getLogger(getClass.getName), | |
+ monitor) | |
/* | |
* Register the channel so we can wait for them for a drain. We close the socket but wait | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala | |
index 30824a6..4b89173 100644 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala | |
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala | |
@@ -9,7 +9,9 @@ import org.jboss.netty.channel.{ | |
SimpleChannelUpstreamHandler, ExceptionEvent, | |
ChannelStateEvent} | |
-import com.twitter.util.{Future, Promise, Throw, Try, Time, Return} | |
+import com.twitter.util.{ | |
+ Future, Promise, Throw, Try, Time, Return, | |
+ Monitor, NullMonitor} | |
import com.twitter.finagle._ | |
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver} | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala | |
index 877c899..a726472 100644 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala | |
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala | |
@@ -2,13 +2,16 @@ package com.twitter.finagle.channel | |
import java.util.concurrent.atomic.AtomicReference | |
import java.util.logging.{Level, Logger} | |
+ | |
+import org.jboss.netty.channel._ | |
+import org.jboss.netty.handler.timeout.ReadTimeoutException | |
+ | |
+import com.twitter.util.{Future, Promise, Return, Throw, Monitor} | |
+ | |
import com.twitter.finagle.{ClientConnection, CodecException, Service, WriteTimedOutException} | |
import com.twitter.finagle.util.Conversions._ | |
import com.twitter.finagle.service.ProxyService | |
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver} | |
-import com.twitter.util.{Future, Promise, Return, Throw} | |
-import org.jboss.netty.channel._ | |
-import org.jboss.netty.handler.timeout.ReadTimeoutException | |
private[finagle] object ServiceToChannelHandler { | |
// valid transitions are: | |
@@ -26,7 +29,8 @@ private[finagle] class ServiceToChannelHandler[Req, Rep]( | |
postponedService: Promise[Service[Req, Rep]], | |
serviceFactory: (ClientConnection) => Service[Req, Rep], | |
statsReceiver: StatsReceiver, | |
- log: Logger) | |
+ log: Logger, | |
+ parentMonitor: Monitor) | |
extends ChannelClosingHandler with ConnectionLifecycleHandler | |
{ | |
import ServiceToChannelHandler._ | |
@@ -34,6 +38,13 @@ private[finagle] class ServiceToChannelHandler[Req, Rep]( | |
private[this] val state = new AtomicReference[State](Idle) | |
private[this] val onShutdownPromise = new Promise[Unit] | |
+ private[this] val monitor = | |
+ parentMonitor andThen new Monitor { | |
+ def handle(cause: Throwable) = { | |
+ shutdown() | |
+ true | |
+ } | |
+ } | |
// we know there's only one outstanding request at a time because | |
// ServerBuilder adds it in a separate layer. | |
@@ -68,7 +79,10 @@ private[finagle] class ServiceToChannelHandler[Req, Rep]( | |
} while (continue) | |
} | |
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) { | |
+ override def messageReceived( | |
+ ctx: ChannelHandlerContext, | |
+ e: MessageEvent | |
+ ): Unit = Monitor.using(monitor) { | |
val channel = ctx.getChannel | |
val message = e.getMessage | |
@@ -87,26 +101,17 @@ private[finagle] class ServiceToChannelHandler[Req, Rep]( | |
case _ => /* let these fall on the floor */ return | |
} | |
- try { | |
- val promise = service(message.asInstanceOf[Req]) | |
+ Monitor.Try { | |
+ service(message.asInstanceOf[Req]) | |
+ } foreach { promise => | |
currentResponse = Some(promise) | |
promise respond { | |
case Return(value) => | |
currentResponse = None | |
Channels.write(channel, value) | |
- case Throw(e: Throwable) => | |
- log.log(Level.WARNING, "service exception", e) | |
- shutdown() | |
+ case Throw(e) => | |
+ Monitor.handle(e) | |
} | |
- } catch { | |
- case e: ClassCastException => | |
- log.log( | |
- Level.SEVERE, | |
- "Got ClassCastException while processing a " + | |
- "message. This is a codec bug. %s".format(e)) | |
- shutdown() | |
- case e => | |
- Channels.fireExceptionCaught(channel, e) | |
} | |
} | |
@@ -146,29 +151,17 @@ private[finagle] class ServiceToChannelHandler[Req, Rep]( | |
*/ | |
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) { | |
val cause = e.getCause | |
- val level = cause match { | |
- case e: java.nio.channels.ClosedChannelException => | |
- Level.FINEST | |
+ | |
+ cause match { | |
+ //XXX: should these be handled by the monitor too? | |
case e: ReadTimeoutException => | |
statsReceiver.counter("read_timeout").incr() | |
- Level.FINEST | |
case e: WriteTimedOutException => | |
statsReceiver.counter("write_timeout").incr() | |
- Level.FINEST | |
- case e: java.io.IOException | |
- if (e.getMessage == "Connection reset by peer" || | |
- e.getMessage == "Broken pipe" || | |
- e.getMessage == "Connection timed out" || | |
- e.getMessage == "No route to host") => | |
- Level.FINEST | |
- case e: javax.net.ssl.SSLException => | |
- Level.FINEST | |
- case e: Throwable => | |
- Level.WARNING | |
+ case _ => | |
+ () | |
} | |
- log.log(level, "Exception caught by service channel handler", cause) | |
- | |
- shutdown() | |
+ monitor.handle(cause) | |
} | |
} | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionFilter.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionFilter.scala | |
deleted file mode 100644 | |
index 27b1781..0000000 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionFilter.scala | |
+++ /dev/null | |
@@ -1,23 +0,0 @@ | |
-package com.twitter.finagle.exception | |
- | |
-import com.twitter.finagle.{Service, SimpleFilter} | |
-import com.twitter.util.Future | |
-import com.twitter.finagle.tracing.Trace | |
- | |
-/** | |
- * A filter that resides in both the client- and server-side finagle service stacks and | |
- * passes the Throwable that underlies Throws to an ExceptionReceiver. | |
- * | |
- * It resides in the service stack under all circumstances. If this feature is undesired, the | |
- * filter will send the exceptional values to a NullExceptionReceiver. | |
- */ | |
-class ExceptionFilter[Req, Rep](exceptionReceiver: ExceptionReceiver) | |
- extends SimpleFilter[Req, Rep] | |
-{ | |
- def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = { | |
- service(request) onFailure { e => | |
- exceptionReceiver.receive(e) | |
- Trace.recordBinary("finagle.exception", e.toString) | |
- } | |
- } | |
-} | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionReceiver.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionReceiver.scala | |
deleted file mode 100644 | |
index 71dda6a..0000000 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionReceiver.scala | |
+++ /dev/null | |
@@ -1,13 +0,0 @@ | |
-package com.twitter.finagle.exception | |
- | |
-/** | |
- * A generic interface for receiving exceptions from Future.throw values | |
- * in the finagle service stack. | |
- */ | |
-trait ExceptionReceiver { | |
- def receive(e: Throwable) | |
-} | |
- | |
-object NullExceptionReceiver extends ExceptionReceiver { | |
- def receive(e: Throwable) {} | |
-} | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/package.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/package.scala | |
deleted file mode 100644 | |
index 22a57e3..0000000 | |
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/package.scala | |
+++ /dev/null | |
@@ -1,15 +0,0 @@ | |
-package com.twitter.finagle | |
- | |
-import java.net.SocketAddress | |
- | |
-package object exception { | |
- /** | |
- * A client is passed the service name | |
- */ | |
- type ClientExceptionReceiverBuilder = String => ExceptionReceiver | |
- | |
- /** | |
- * A server factory is passed the service name and the SocketAddress that it is bound to | |
- */ | |
- type ServerExceptionReceiverBuilder = (String, SocketAddress) => ExceptionReceiver | |
-} | |
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/filter/MonitorFilter.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/filter/MonitorFilter.scala | |
new file mode 100644 | |
index 0000000..8e2f197 | |
--- /dev/null | |
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/filter/MonitorFilter.scala | |
@@ -0,0 +1,21 @@ | |
+package com.twitter.finagle.filter | |
+ | |
+import com.twitter.util.{Monitor, Future, Return, Throw} | |
+ | |
+import com.twitter.finagle.{SimpleFilter, Service} | |
+ | |
+class MonitorFilter[Req, Rep](monitor: Monitor) | |
+ extends SimpleFilter[Req, Rep] | |
+{ | |
+ def apply( | |
+ request: Req, | |
+ service: Service[Req, Rep] | |
+ ): Future[Rep] = Monitor.using(monitor) { | |
+ Monitor.Try { service(request) } match { | |
+ case Return(reply) => | |
+ reply onFailure { exc => Monitor.handle(exc) } | |
+ case Throw(exc) => | |
+ Future.exception(exc) | |
+ } | |
+ } | |
+} | |
diff --git a/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala b/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala | |
index 61d40f9..a5b070d 100644 | |
--- a/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala | |
+++ b/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala | |
@@ -11,7 +11,7 @@ import org.jboss.netty.channel.{ | |
ChannelPipeline, DownstreamMessageEvent, | |
ChannelStateEvent, Channels} | |
-import com.twitter.util.{Future, Promise} | |
+import com.twitter.util.{Future, Promise, NullMonitor} | |
import com.twitter.finagle.{ClientConnection, Service} | |
import com.twitter.finagle.stats.StatsReceiver | |
@@ -32,7 +32,7 @@ object ServiceToChannelHandlerSpec extends Specification with Mockito { | |
val postponedService = mock[Promise[Service[Foo, String]]] | |
val serviceFactory = { (clientConnection: ClientConnection) => service } | |
val handler = new ServiceToChannelHandler(service, postponedService, serviceFactory, | |
- log, Logger.getLogger(getClass.getName)) | |
+ log, Logger.getLogger(getClass.getName), NullMonitor) | |
val pipeline = mock[ChannelPipeline] | |
val channel = mock[Channel] | |
val closeFuture = Channels.future(channel) | |
diff --git a/finagle/finagle-core/src/test/scala/com/twitter/finagle/filter/MonitorFilterSpec.scala b/finagle/finagle-core/src/test/scala/com/twitter/finagle/filter/MonitorFilterSpec.scala | |
new file mode 100644 | |
index 0000000..22af01f | |
--- /dev/null | |
+++ b/finagle/finagle-core/src/test/scala/com/twitter/finagle/filter/MonitorFilterSpec.scala | |
@@ -0,0 +1,39 @@ | |
+package com.twitter.finagle.filter | |
+ | |
+import org.specs.Specification | |
+import org.specs.mock.Mockito | |
+ | |
+import com.twitter.util.{Monitor, Promise, Return, Throw} | |
+ | |
+import com.twitter.finagle.Service | |
+ | |
+object MonitorFilterSpec extends Specification with Mockito { | |
+ class MockMonitor extends Monitor { | |
+ def handle(cause: Throwable) = false | |
+ } | |
+ | |
+ "MonitorFilter" should { | |
+ val monitor = spy(new MockMonitor) | |
+ val underlying = mock[Service[Int, Int]] | |
+ val reply = new Promise[Int] | |
+ underlying(any) returns reply | |
+ val service = new MonitorFilter(monitor) andThen underlying | |
+ val exc = new RuntimeException | |
+ | |
+ "report Future.exception" in { | |
+ val f = service(123) | |
+ f.poll must beNone | |
+ | |
+ reply() = Throw(exc) | |
+ f.poll must beSome(Throw(exc)) | |
+ there was one(monitor).handle(exc) | |
+ } | |
+ | |
+ "report raw service exception" in { | |
+ underlying(any) throws exc | |
+ val f = service(123) | |
+ f.poll must beSome(Throw(exc)) | |
+ there was one(monitor).handle(exc) | |
+ } | |
+ } | |
+} | |
diff --git a/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala b/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala | |
index 6653a52..ea8d944 100644 | |
--- a/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala | |
+++ b/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala | |
@@ -1,17 +1,21 @@ | |
package com.twitter.finagle.exception | |
-import com.twitter.finagle.tracing.Trace | |
-import com.twitter.util.Time | |
import java.net.{SocketAddress, InetSocketAddress, InetAddress} | |
-import org.apache.scribe.{LogEntry, ResultCode, scribe} | |
import scala.collection.JavaConversions._ | |
-import com.twitter.finagle.builder.ClientBuilder | |
-import com.twitter.finagle.thrift.ThriftClientFramedCodec | |
+ | |
import org.apache.thrift.protocol.TBinaryProtocol | |
-import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver} | |
+import org.apache.scribe.{LogEntry, ResultCode, scribe} | |
+ | |
import com.twitter.util.GZIPStringEncoder | |
+import com.twitter.util.{Time, Monitor} | |
+ | |
+import com.twitter.finagle.tracing.Trace | |
+import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver} | |
+import com.twitter.finagle.builder.ClientBuilder | |
+import com.twitter.finagle.thrift.ThriftClientFramedCodec | |
+ | |
/** | |
- * A collection of methods to construct an ExceptionReceiver that logs to a ScribeHandler | |
+ * A collection of methods to construct a Monitor that logs to a ScribeHandler | |
* specifically for the chickadee exception reporting service. These methods are not generic | |
* enough for general use. | |
*/ | |
@@ -76,7 +80,7 @@ sealed case class Reporter( | |
serviceName: String, | |
statsReceiver: StatsReceiver = NullStatsReceiver, | |
private val sourceAddress: Option[String] = None, | |
- private val clientAddress: Option[String] = None) extends ExceptionReceiver { | |
+ private val clientAddress: Option[String] = None) extends Monitor { | |
/** | |
* Add a modifier to append a client address (i.e. endpoint) to a generated ServiceException. | |
@@ -117,12 +121,14 @@ sealed case class Reporter( | |
* See top level comment for this class for more details on performance | |
* implications. | |
*/ | |
- def receive(t: Throwable) { | |
+ def handle(t: Throwable) = { | |
client.Log(createEntry(t) :: Nil) onSuccess { | |
case ResultCode.OK => statsReceiver.counter("report_exception_ok").incr() | |
case ResultCode.TRY_LATER => statsReceiver.counter("report_exception_try_later").incr() | |
} onFailure { | |
case e => statsReceiver.counter("report_exception_" + e.toString).incr() | |
} | |
+ | |
+ false // did not actually handle | |
} | |
} | |
diff --git a/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala b/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala | |
index f313bba..fc92b4a 100644 | |
--- a/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala | |
+++ b/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala | |
@@ -23,7 +23,7 @@ object ReporterSpec extends Specification with Mockito { | |
val tse = new TestServiceException("service16", "my cool message") | |
"log entries to a client once upon receive" in { | |
- reporter.receive(tse.throwable) | |
+ reporter.handle(tse.throwable) | |
there was one(logger).Log(captor.capture()) | |
} | |
@@ -48,7 +48,7 @@ object ReporterSpec extends Specification with Mockito { | |
val tse = new TestServiceException("service16", "my cool message", clientAddress = Some(InetAddress.getLocalHost.getHostAddress)) | |
"log entries to a client once upon receive" in { | |
- reporter.receive(tse.throwable) | |
+ reporter.handle(tse.throwable) | |
there was one(logger).Log(captor.capture()) | |
} | |
@@ -76,7 +76,7 @@ object ReporterSpec extends Specification with Mockito { | |
val tse = new TestServiceException("service16", "my cool message", clientAddress = Some(InetAddress.getLocalHost.getHostAddress), sourceAddress = Some(socket.getAddress.getHostAddress + ":" + socket.getPort)) | |
"log entries to a client once upon receive" in { | |
- reporter.receive(tse.throwable) | |
+ reporter.handle(tse.throwable) | |
there was one(logger).Log(captor.capture()) | |
} | |
diff --git a/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala b/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala | |
index 879267b..174469f 100644 | |
--- a/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala | |
+++ b/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala | |
@@ -1,11 +1,11 @@ | |
package com.twitter.concurrent | |
-import com.twitter.util.Duration | |
import java.util.concurrent.ArrayBlockingQueue | |
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater | |
import scala.annotation.tailrec | |
import scala.collection.mutable.Queue | |
+import com.twitter.util.{Duration, Monitor} | |
/** | |
* An IVar is an "I-structured variable". It is a mutable cell that begins empty but can | |
diff --git a/util/util-core/src/main/scala/com/twitter/util/Future.scala b/util/util-core/src/main/scala/com/twitter/util/Future.scala | |
index 1731b9f..4d5cb78 100644 | |
--- a/util/util-core/src/main/scala/com/twitter/util/Future.scala | |
+++ b/util/util-core/src/main/scala/com/twitter/util/Future.scala | |
@@ -552,6 +552,9 @@ class Promise[A] private[Promise] ( | |
} | |
} | |
+ /** | |
+ * XXX Note: exceptions are monitored. | |
+ */ | |
override def respond(k: Try[A] => Unit): Future[A] = { | |
// Note that there's a race here, but that's | |
// okay. The resulting Futures are | |
@@ -566,7 +569,7 @@ class Promise[A] private[Promise] ( | |
if (chained eq null) | |
chained = new Promise(ivar.chained, cancelled) | |
val next = chained | |
- respond0(k) | |
+ respond0 { r => Monitor { k(r) } } | |
next | |
} | |
@@ -634,7 +637,7 @@ class Promise[A] private[Promise] ( | |
override def filter(p: A => Boolean): Future[A] = { | |
makePromise[A](this) { promise => | |
- respond0 { x => promise() = x.filter(p) } | |
+ respond0 { x => Monitor { promise() = x.filter(p) } } | |
} | |
} | |
diff --git a/util/util-core/src/main/scala/com/twitter/util/Monitor.scala b/util/util-core/src/main/scala/com/twitter/util/Monitor.scala | |
new file mode 100644 | |
index 0000000..7009991 | |
--- /dev/null | |
+++ b/util/util-core/src/main/scala/com/twitter/util/Monitor.scala | |
@@ -0,0 +1,107 @@ | |
+package com.twitter.util | |
+ | |
+import java.util.logging.{Logger, Level} | |
+import scala.annotation.tailrec | |
+ | |
+/** | |
+ * Wraps an exception that happens when handling another exception in | |
+ * a monitor. | |
+ */ | |
+case class MonitorException( | |
+ handlingExc: Throwable, | |
+ monitorExc: Throwable | |
+) extends Exception { | |
+ override def getMessage = | |
+ "threw exception \""+monitorExc+"\" while handling "+ | |
+ "another exception \""+handlingExc+"\"" | |
+} | |
+ | |
+/** | |
+ * A Monitor is a composable exception handler. | |
+ */ | |
+trait Monitor { self => | |
+ /** | |
+ * Attempt to handle the exception {{exc}}. | |
+ * | |
+ * @return whether the exception was handled by this Monitor | |
+ */ | |
+ def handle(exc: Throwable): Boolean | |
+ | |
+ /** | |
+ * Run {{f}} inside of the monitor context. That is, if the | |
+ * computation {{f}} throws an exception, the Monitor will attempt | |
+ * to handle it. If it fails to do so, the exception is propagated | |
+ * to the caller | |
+ */ | |
+ def apply(f: => Unit) { | |
+ try f catch { case exc => if (!handle(exc)) throw exc } | |
+ } | |
+ | |
+ def orElse(next: Monitor) = new Monitor { | |
+ def handle(exc: Throwable) = | |
+ self.tryHandle(exc) rescue { case exc1 => | |
+ next.tryHandle(exc1) | |
+ } isReturn | |
+ } | |
+ | |
+ def andThen(next: Monitor) = new Monitor { | |
+ def handle(exc: Throwable) = | |
+ self.tryHandle(exc) match { | |
+ case Return(()) => | |
+ next.tryHandle(exc) | |
+ true | |
+ case Throw(exc1) => | |
+ next.tryHandle(exc1).isReturn | |
+ } | |
+ } | |
+ | |
+ def Try[T](f: => T): Try[T] = com.twitter.util.Try(f) onFailure { handle(_) } | |
+ | |
+ protected def tryHandle(exc: Throwable) = | |
+ com.twitter.util.Try { self.handle(exc) } rescue { | |
+ case monitorExc => Throw(MonitorException(exc, monitorExc)) | |
+ } flatMap { ok => | |
+ if (ok) Return(()) | |
+ else Throw(exc): Try[Boolean] | |
+ } | |
+} | |
+ | |
+object NullMonitor extends Monitor { | |
+ def handle(exc: Throwable) = false | |
+} | |
+ | |
+object Monitor extends Monitor { | |
+ private[this] val local = new Local[Monitor] | |
+ | |
+ def get = local() getOrElse NullMonitor | |
+ def set(m: Monitor) { local() = m } | |
+ | |
+ @inline | |
+ def using[T](m: Monitor)(f: => T): T = restore { | |
+ set(m) | |
+ f | |
+ } | |
+ | |
+ @inline | |
+ def restore[T](f: => T): T = { | |
+ val saved = local() | |
+ try f finally { local.set(saved) } | |
+ } | |
+ | |
+ // shoudl we really include the root monitor here? | |
+ | |
+ def handle(exc: Throwable): Boolean = | |
+ (get orElse RootMonitor).handle(exc) | |
+} | |
+ | |
+object RootMonitor extends Monitor { | |
+ private[this] val log = Logger.getLogger("monitor") | |
+ | |
+ def handle(exc: Throwable) = { | |
+ log.log(Level.SEVERE, "Exception propagated to the root monitor!", exc) | |
+ /* | |
+ * We select to call this handled. It's the last resort. | |
+ */ | |
+ true | |
+ } | |
+} | |
diff --git a/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala b/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala | |
index 6550bc2..25d6361 100644 | |
--- a/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala | |
+++ b/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala | |
@@ -521,6 +521,21 @@ class FutureSpec extends Specification with Mockito { | |
f()= Return(1) | |
wasCalledWith mustEqual Some(1) | |
} | |
+ | |
+ "monitor exceptions" in { | |
+ val m = spy(new MonitorSpec.MockMonitor) | |
+ val exc = new Exception | |
+ m.handle(any) returns true | |
+ val p = new Promise[Int] | |
+ | |
+ Monitor.using(m) { | |
+ p ensure { throw exc } | |
+ } | |
+ | |
+ there was no(m).handle(any) | |
+ p.update(Return(1)) mustNot throwA[Throwable] | |
+ there was one(m).handle(exc) | |
+ } | |
} | |
"Future() handles exceptions" in { | |
diff --git a/util/util-core/src/test/scala/com/twitter/util/MonitorSpec.scala b/util/util-core/src/test/scala/com/twitter/util/MonitorSpec.scala | |
new file mode 100644 | |
index 0000000..18c3af8 | |
--- /dev/null | |
+++ b/util/util-core/src/test/scala/com/twitter/util/MonitorSpec.scala | |
@@ -0,0 +1,117 @@ | |
+package com.twitter.util | |
+ | |
+import org.specs.Specification | |
+import org.specs.mock.Mockito | |
+import com.twitter.conversions.time._ | |
+import java.util.concurrent.ConcurrentLinkedQueue | |
+import com.twitter.concurrent.SimpleSetter | |
+ | |
+object MonitorSpec extends Specification with Mockito { | |
+ class MockMonitor extends Monitor { | |
+ def handle(cause: Throwable) = false | |
+ } | |
+ | |
+ "Monitor#orElse" should { | |
+ val m0, m1, m2 = spy(new MockMonitor) | |
+ Seq(m0, m1, m2) foreach { _.handle(any) returns true } | |
+ val exc = new Exception | |
+ val m = m0 orElse m1 orElse m2 | |
+ | |
+ "stop at first successful handle" in { | |
+ m.handle(exc) must beTrue | |
+ | |
+ there was one(m0).handle(exc) | |
+ there was no(m1).handle(exc) | |
+ there was no(m2).handle(exc) | |
+ | |
+ m0.handle(any) returns false | |
+ | |
+ m.handle(exc) must beTrue | |
+ there were two(m0).handle(exc) | |
+ there was one(m1).handle(exc) | |
+ there was no(m2).handle(exc) | |
+ } | |
+ | |
+ "fail when no nothing got handled" in { | |
+ Seq(m0, m1, m2) foreach { _.handle(any) returns false } | |
+ m.handle(exc) must beFalse | |
+ Seq(m0, m1, m2) foreach { m => there was one(m).handle(exc) } | |
+ } | |
+ | |
+ "wrap Monitor exceptions and pass them on" in { | |
+ val rte = new RuntimeException("really bad news") | |
+ m0.handle(any) throws rte | |
+ m.handle(exc) must beTrue | |
+ there was one(m0).handle(exc) | |
+ there was one(m1).handle(MonitorException(exc, rte)) | |
+ } | |
+ } | |
+ | |
+ "Monitor#andThen" should { | |
+ val m0, m1 = spy(new MockMonitor) | |
+ m0.handle(any) returns true | |
+ m1.handle(any) returns true | |
+ val m = m0 andThen m1 | |
+ val exc = new Exception | |
+ | |
+ "run all monitors" in { | |
+ m.handle(exc) must beTrue | |
+ there was one(m0).handle(exc) | |
+ there was one(m1).handle(exc) | |
+ } | |
+ | |
+ "be succcessful when any underlying monitor is" in { | |
+ m0.handle(any) returns false | |
+ m.handle(exc) must beTrue | |
+ m1.handle(any) returns false | |
+ m.handle(exc) must beFalse | |
+ } | |
+ | |
+ "wrap Monitor exceptions and pass them on" in { | |
+ val rte = new RuntimeException("really bad news") | |
+ m0.handle(any) throws rte | |
+ m.handle(exc) must beTrue | |
+ there was one(m0).handle(exc) | |
+ there was one(m1).handle(MonitorException(exc, rte)) | |
+ } | |
+ | |
+ "fail if both monitors throw" in { | |
+ val rte = new RuntimeException("really bad news") | |
+ m0.handle(any) throws rte | |
+ m1.handle(any) throws rte | |
+ m.handle(exc) must beFalse | |
+ } | |
+ } | |
+ | |
+ "Monitor.get, Monitor.set()" should { | |
+ val m = spy(new MockMonitor) | |
+ m.handle(any) returns true | |
+ | |
+ "maintain current monitor" in Monitor.restore { | |
+ Monitor.set(m) | |
+ Monitor.get mustBe m | |
+ } | |
+ } | |
+ | |
+ "Monitor.handle" should { | |
+ val m = spy(new MockMonitor) | |
+ m.handle(any) returns true | |
+ | |
+ "dispatch to current monitor" in Monitor.restore { | |
+ val exc = new Exception | |
+ Monitor.set(m) | |
+ Monitor.handle(exc) | |
+ there was one(m).handle(exc) | |
+ } | |
+ } | |
+ | |
+ "Monitor.restore" should { | |
+ "restore current configuration" in { | |
+ val orig = Monitor.get | |
+ Monitor.restore { | |
+ Monitor.set(mock[Monitor]) | |
+ } | |
+ Monitor.get mustBe orig | |
+ } | |
+ } | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment