Skip to content

Instantly share code, notes, and snippets.

@mariusae
Created November 4, 2011 17:24
Show Gist options
  • Save mariusae/1339915 to your computer and use it in GitHub Desktop.
Save mariusae/1339915 to your computer and use it in GitHub Desktop.
Monitors for Finagle.
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
index 1df9c58..32663db 100644
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ClientBuilder.scala
@@ -56,7 +56,7 @@ import org.jboss.netty.channel.socket.nio._
import org.jboss.netty.handler.ssl._
import org.jboss.netty.handler.timeout.IdleStateHandler
-import com.twitter.util.{Future, Duration, Throw, Return}
+import com.twitter.util.{Future, Duration, Throw, Return, Monitor, NullMonitor}
import com.twitter.util.TimeConversions._
import com.twitter.finagle.channel._
@@ -65,6 +65,7 @@ import com.twitter.finagle.pool._
import com.twitter.finagle._
import com.twitter.finagle.service._
import com.twitter.finagle.factory._
+import com.twitter.finagle.filter.MonitorFilter
import com.twitter.finagle.stats.{StatsReceiver, RollupStatsReceiver, NullStatsReceiver, GlobalStatsReceiver}
import com.twitter.finagle.loadbalancer.{LoadBalancedFactory, LeastQueuedStrategy, HeapBalancer}
import com.twitter.finagle.ssl.{Ssl, SslConnectHandler}
@@ -150,7 +151,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
private val _readerIdleTimeout : Option[Duration] = None,
private val _writerIdleTimeout : Option[Duration] = None,
private val _statsReceiver : Option[StatsReceiver] = None,
- private val _exceptionReceiver : Option[ClientExceptionReceiverBuilder] = None,
+ private val _monitor : Option[String => Monitor] = None,
private val _name : Option[String] = Some("client"),
private val _sendBufferSize : Option[Int] = None,
private val _recvBufferSize : Option[Int] = None,
@@ -176,7 +177,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
val connectTimeout = _connectTimeout
val timeout = _timeout
val statsReceiver = _statsReceiver
- val exceptionReceiver = _exceptionReceiver
+ val monitor = _monitor
val keepAlive = _keepAlive
val readerIdleTimeout = _readerIdleTimeout
val writerIdleTimeout = _writerIdleTimeout
@@ -208,7 +209,7 @@ final case class ClientConfig[Req, Rep, HasCluster, HasCodec, HasHostConnectionL
"readerIdleTimeout" -> Some(_readerIdleTimeout),
"writerIdleTimeout" -> Some(_writerIdleTimeout),
"statsReceiver" -> _statsReceiver,
- "exceptionReceiver" -> _exceptionReceiver,
+ "monitor" -> _monitor,
"name" -> _name,
"hostConnectionCoresize" -> _hostConfig.hostConnectionCoresize,
"hostConnectionLimit" -> _hostConfig.hostConnectionLimit,
@@ -511,8 +512,8 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
def tracer(tracer: Tracer): This =
withConfig(_.copy(_tracerFactory = () => tracer))
- def exceptionReceiver(erFactory: ClientExceptionReceiverBuilder): This =
- withConfig(_.copy(_exceptionReceiver = Some(erFactory)))
+ def monitor(mFactory: String => Monitor): This =
+ withConfig(_.copy(_monitor = Some(mFactory)))
/**
* Log very detailed debug information to the given logger.
@@ -664,7 +665,6 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
)
var factory: ServiceFactory[Req, Rep] = null
-
val bs = buildBootstrap(codec, host)
factory = new ChannelServiceFactory[Req, Rep](
bs, prepareService(codec) _, hostStatsReceiver)
@@ -682,15 +682,14 @@ class ClientBuilder[Req, Rep, HasCluster, HasCodec, HasHostConnectionLimit] priv
factory = new FailureAccrualFactory(factory, numFailures, markDeadFor)
}
- val exceptionFilter = new ExceptionFilter[Req, Rep](
- config.exceptionReceiver map {
- _(config.name.get)
- } getOrElse {
- NullExceptionReceiver
- }
- )
val statsFilter = new StatsFilter[Req, Rep](hostStatsReceiver)
- factory = exceptionFilter andThen statsFilter andThen factory
+ val monitorFilter = new MonitorFilter[Req, Rep]({
+ config.monitor map { mf =>
+ mf(config.name.get)
+ } getOrElse NullMonitor
+ })
+
+ factory = monitorFilter andThen statsFilter andThen factory
factory
}
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
index 6e1e8cb..8b2b8b0 100644
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -53,7 +53,7 @@ import org.jboss.netty.channel.socket.nio._
import org.jboss.netty.handler.ssl._
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
-import com.twitter.util.Duration
+import com.twitter.util.{Duration, Monitor, NullMonitor}
import com.twitter.conversions.time._
import com.twitter.finagle._
@@ -121,7 +121,7 @@ object ServerConfig {
final case class ServerConfig[Req, Rep, HasCodec, HasBindTo, HasName](
private val _codecFactory: Option[CodecFactory[Req, Rep]#Server] = None,
private val _statsReceiver: Option[StatsReceiver] = None,
- private val _exceptionReceiver: Option[ServerExceptionReceiverBuilder] = None,
+ private val _monitor: Option[(String, SocketAddress) => Monitor] = None,
private val _name: Option[String] = None,
private val _sendBufferSize: Option[Int] = None,
private val _recvBufferSize: Option[Int] = None,
@@ -151,7 +151,7 @@ final case class ServerConfig[Req, Rep, HasCodec, HasBindTo, HasName](
*/
val codecFactory = _codecFactory
val statsReceiver = _statsReceiver
- val exceptionReceiver = _exceptionReceiver
+ val monitor = _monitor
val name = _name
val sendBufferSize = _sendBufferSize
val recvBufferSize = _recvBufferSize
@@ -175,7 +175,7 @@ final case class ServerConfig[Req, Rep, HasCodec, HasBindTo, HasName](
def toMap = Map(
"codecFactory" -> _codecFactory,
"statsReceiver" -> _statsReceiver,
- "exceptionReceiver" -> _exceptionReceiver,
+ "monitor" -> _monitor,
"name" -> _name,
"sendBufferSize" -> _sendBufferSize,
"recvBufferSize" -> _recvBufferSize,
@@ -317,8 +317,8 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder](
def writeCompletionTimeout(howlong: Duration): This =
withConfig(_.copy(_writeCompletionTimeout = Some(howlong)))
- def exceptionReceiver(erFactory: ServerExceptionReceiverBuilder): This =
- withConfig(_.copy(_exceptionReceiver = Some(erFactory)))
+ def monitor(mFactory: (String, SocketAddress) => Monitor): This =
+ withConfig(_.copy(_monitor = Some(mFactory)))
def tracerFactory(factory: Tracer.Factory): This =
withConfig(_.copy(_tracerFactory = factory))
@@ -495,17 +495,7 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder](
new ProxyService(postponedService flatMap { s => codec.prepareService(s) })
}
- // Add the exception service at the bottom layer
- // This is not required, but argubably the best style
- val exceptionFilter = new ExceptionFilter[Req, Rep] (
- config.exceptionReceiver map {
- _(config.name.get, config.bindTo.get)
- } getOrElse {
- NullExceptionReceiver
- }
- )
-
- service = exceptionFilter andThen service
+ // service = exceptionFilter andThen service
statsFilter foreach { sf =>
service = sf andThen service
@@ -547,9 +537,16 @@ class ServerBuilder[Req, Rep, HasCodec, HasBindTo, HasName] private[builder](
// one here.
service = (new TracingFilter(tracer)) andThen service
+ val monitor = config.monitor map {
+ _(config.name.get, config.bindTo.get)
+ } getOrElse {
+ NullMonitor
+ }
+
val channelHandler = new ServiceToChannelHandler(
service, postponedService, serviceFactory,
- scopedOrNullStatsReceiver, Logger.getLogger(getClass.getName))
+ scopedOrNullStatsReceiver, Logger.getLogger(getClass.getName),
+ monitor)
/*
* Register the channel so we can wait for them for a drain. We close the socket but wait
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
index 30824a6..4b89173 100644
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelService.scala
@@ -9,7 +9,9 @@ import org.jboss.netty.channel.{
SimpleChannelUpstreamHandler, ExceptionEvent,
ChannelStateEvent}
-import com.twitter.util.{Future, Promise, Throw, Try, Time, Return}
+import com.twitter.util.{
+ Future, Promise, Throw, Try, Time, Return,
+ Monitor, NullMonitor}
import com.twitter.finagle._
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
index 877c899..a726472 100644
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
@@ -2,13 +2,16 @@ package com.twitter.finagle.channel
import java.util.concurrent.atomic.AtomicReference
import java.util.logging.{Level, Logger}
+
+import org.jboss.netty.channel._
+import org.jboss.netty.handler.timeout.ReadTimeoutException
+
+import com.twitter.util.{Future, Promise, Return, Throw, Monitor}
+
import com.twitter.finagle.{ClientConnection, CodecException, Service, WriteTimedOutException}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.service.ProxyService
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
-import com.twitter.util.{Future, Promise, Return, Throw}
-import org.jboss.netty.channel._
-import org.jboss.netty.handler.timeout.ReadTimeoutException
private[finagle] object ServiceToChannelHandler {
// valid transitions are:
@@ -26,7 +29,8 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
postponedService: Promise[Service[Req, Rep]],
serviceFactory: (ClientConnection) => Service[Req, Rep],
statsReceiver: StatsReceiver,
- log: Logger)
+ log: Logger,
+ parentMonitor: Monitor)
extends ChannelClosingHandler with ConnectionLifecycleHandler
{
import ServiceToChannelHandler._
@@ -34,6 +38,13 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
private[this] val state = new AtomicReference[State](Idle)
private[this] val onShutdownPromise = new Promise[Unit]
+ private[this] val monitor =
+ parentMonitor andThen new Monitor {
+ def handle(cause: Throwable) = {
+ shutdown()
+ true
+ }
+ }
// we know there's only one outstanding request at a time because
// ServerBuilder adds it in a separate layer.
@@ -68,7 +79,10 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
} while (continue)
}
- override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
+ override def messageReceived(
+ ctx: ChannelHandlerContext,
+ e: MessageEvent
+ ): Unit = Monitor.using(monitor) {
val channel = ctx.getChannel
val message = e.getMessage
@@ -87,26 +101,17 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
case _ => /* let these fall on the floor */ return
}
- try {
- val promise = service(message.asInstanceOf[Req])
+ Monitor.Try {
+ service(message.asInstanceOf[Req])
+ } foreach { promise =>
currentResponse = Some(promise)
promise respond {
case Return(value) =>
currentResponse = None
Channels.write(channel, value)
- case Throw(e: Throwable) =>
- log.log(Level.WARNING, "service exception", e)
- shutdown()
+ case Throw(e) =>
+ Monitor.handle(e)
}
- } catch {
- case e: ClassCastException =>
- log.log(
- Level.SEVERE,
- "Got ClassCastException while processing a " +
- "message. This is a codec bug. %s".format(e))
- shutdown()
- case e =>
- Channels.fireExceptionCaught(channel, e)
}
}
@@ -146,29 +151,17 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
*/
override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
val cause = e.getCause
- val level = cause match {
- case e: java.nio.channels.ClosedChannelException =>
- Level.FINEST
+
+ cause match {
+ //XXX: should these be handled by the monitor too?
case e: ReadTimeoutException =>
statsReceiver.counter("read_timeout").incr()
- Level.FINEST
case e: WriteTimedOutException =>
statsReceiver.counter("write_timeout").incr()
- Level.FINEST
- case e: java.io.IOException
- if (e.getMessage == "Connection reset by peer" ||
- e.getMessage == "Broken pipe" ||
- e.getMessage == "Connection timed out" ||
- e.getMessage == "No route to host") =>
- Level.FINEST
- case e: javax.net.ssl.SSLException =>
- Level.FINEST
- case e: Throwable =>
- Level.WARNING
+ case _ =>
+ ()
}
- log.log(level, "Exception caught by service channel handler", cause)
-
- shutdown()
+ monitor.handle(cause)
}
}
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionFilter.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionFilter.scala
deleted file mode 100644
index 27b1781..0000000
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionFilter.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package com.twitter.finagle.exception
-
-import com.twitter.finagle.{Service, SimpleFilter}
-import com.twitter.util.Future
-import com.twitter.finagle.tracing.Trace
-
-/**
- * A filter that resides in both the client- and server-side finagle service stacks and
- * passes the Throwable that underlies Throws to an ExceptionReceiver.
- *
- * It resides in the service stack under all circumstances. If this feature is undesired, the
- * filter will send the exceptional values to a NullExceptionReceiver.
- */
-class ExceptionFilter[Req, Rep](exceptionReceiver: ExceptionReceiver)
- extends SimpleFilter[Req, Rep]
-{
- def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
- service(request) onFailure { e =>
- exceptionReceiver.receive(e)
- Trace.recordBinary("finagle.exception", e.toString)
- }
- }
-}
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionReceiver.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionReceiver.scala
deleted file mode 100644
index 71dda6a..0000000
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/ExceptionReceiver.scala
+++ /dev/null
@@ -1,13 +0,0 @@
-package com.twitter.finagle.exception
-
-/**
- * A generic interface for receiving exceptions from Future.throw values
- * in the finagle service stack.
- */
-trait ExceptionReceiver {
- def receive(e: Throwable)
-}
-
-object NullExceptionReceiver extends ExceptionReceiver {
- def receive(e: Throwable) {}
-}
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/package.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/package.scala
deleted file mode 100644
index 22a57e3..0000000
--- a/finagle/finagle-core/src/main/scala/com/twitter/finagle/exception/package.scala
+++ /dev/null
@@ -1,15 +0,0 @@
-package com.twitter.finagle
-
-import java.net.SocketAddress
-
-package object exception {
- /**
- * A client is passed the service name
- */
- type ClientExceptionReceiverBuilder = String => ExceptionReceiver
-
- /**
- * A server factory is passed the service name and the SocketAddress that it is bound to
- */
- type ServerExceptionReceiverBuilder = (String, SocketAddress) => ExceptionReceiver
-}
diff --git a/finagle/finagle-core/src/main/scala/com/twitter/finagle/filter/MonitorFilter.scala b/finagle/finagle-core/src/main/scala/com/twitter/finagle/filter/MonitorFilter.scala
new file mode 100644
index 0000000..8e2f197
--- /dev/null
+++ b/finagle/finagle-core/src/main/scala/com/twitter/finagle/filter/MonitorFilter.scala
@@ -0,0 +1,21 @@
+package com.twitter.finagle.filter
+
+import com.twitter.util.{Monitor, Future, Return, Throw}
+
+import com.twitter.finagle.{SimpleFilter, Service}
+
+class MonitorFilter[Req, Rep](monitor: Monitor)
+ extends SimpleFilter[Req, Rep]
+{
+ def apply(
+ request: Req,
+ service: Service[Req, Rep]
+ ): Future[Rep] = Monitor.using(monitor) {
+ Monitor.Try { service(request) } match {
+ case Return(reply) =>
+ reply onFailure { exc => Monitor.handle(exc) }
+ case Throw(exc) =>
+ Future.exception(exc)
+ }
+ }
+}
diff --git a/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala b/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala
index 61d40f9..a5b070d 100644
--- a/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala
+++ b/finagle/finagle-core/src/test/scala/com/twitter/finagle/channel/ServiceToChannelHandlerSpec.scala
@@ -11,7 +11,7 @@ import org.jboss.netty.channel.{
ChannelPipeline, DownstreamMessageEvent,
ChannelStateEvent, Channels}
-import com.twitter.util.{Future, Promise}
+import com.twitter.util.{Future, Promise, NullMonitor}
import com.twitter.finagle.{ClientConnection, Service}
import com.twitter.finagle.stats.StatsReceiver
@@ -32,7 +32,7 @@ object ServiceToChannelHandlerSpec extends Specification with Mockito {
val postponedService = mock[Promise[Service[Foo, String]]]
val serviceFactory = { (clientConnection: ClientConnection) => service }
val handler = new ServiceToChannelHandler(service, postponedService, serviceFactory,
- log, Logger.getLogger(getClass.getName))
+ log, Logger.getLogger(getClass.getName), NullMonitor)
val pipeline = mock[ChannelPipeline]
val channel = mock[Channel]
val closeFuture = Channels.future(channel)
diff --git a/finagle/finagle-core/src/test/scala/com/twitter/finagle/filter/MonitorFilterSpec.scala b/finagle/finagle-core/src/test/scala/com/twitter/finagle/filter/MonitorFilterSpec.scala
new file mode 100644
index 0000000..22af01f
--- /dev/null
+++ b/finagle/finagle-core/src/test/scala/com/twitter/finagle/filter/MonitorFilterSpec.scala
@@ -0,0 +1,39 @@
+package com.twitter.finagle.filter
+
+import org.specs.Specification
+import org.specs.mock.Mockito
+
+import com.twitter.util.{Monitor, Promise, Return, Throw}
+
+import com.twitter.finagle.Service
+
+object MonitorFilterSpec extends Specification with Mockito {
+ class MockMonitor extends Monitor {
+ def handle(cause: Throwable) = false
+ }
+
+ "MonitorFilter" should {
+ val monitor = spy(new MockMonitor)
+ val underlying = mock[Service[Int, Int]]
+ val reply = new Promise[Int]
+ underlying(any) returns reply
+ val service = new MonitorFilter(monitor) andThen underlying
+ val exc = new RuntimeException
+
+ "report Future.exception" in {
+ val f = service(123)
+ f.poll must beNone
+
+ reply() = Throw(exc)
+ f.poll must beSome(Throw(exc))
+ there was one(monitor).handle(exc)
+ }
+
+ "report raw service exception" in {
+ underlying(any) throws exc
+ val f = service(123)
+ f.poll must beSome(Throw(exc))
+ there was one(monitor).handle(exc)
+ }
+ }
+}
diff --git a/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala b/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala
index 6653a52..ea8d944 100644
--- a/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala
+++ b/finagle/finagle-exception/src/main/scala/com/twitter/finagle/exception/Reporter.scala
@@ -1,17 +1,21 @@
package com.twitter.finagle.exception
-import com.twitter.finagle.tracing.Trace
-import com.twitter.util.Time
import java.net.{SocketAddress, InetSocketAddress, InetAddress}
-import org.apache.scribe.{LogEntry, ResultCode, scribe}
import scala.collection.JavaConversions._
-import com.twitter.finagle.builder.ClientBuilder
-import com.twitter.finagle.thrift.ThriftClientFramedCodec
+
import org.apache.thrift.protocol.TBinaryProtocol
-import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
+import org.apache.scribe.{LogEntry, ResultCode, scribe}
+
import com.twitter.util.GZIPStringEncoder
+import com.twitter.util.{Time, Monitor}
+
+import com.twitter.finagle.tracing.Trace
+import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
+import com.twitter.finagle.builder.ClientBuilder
+import com.twitter.finagle.thrift.ThriftClientFramedCodec
+
/**
- * A collection of methods to construct an ExceptionReceiver that logs to a ScribeHandler
+ * A collection of methods to construct a Monitor that logs to a ScribeHandler
* specifically for the chickadee exception reporting service. These methods are not generic
* enough for general use.
*/
@@ -76,7 +80,7 @@ sealed case class Reporter(
serviceName: String,
statsReceiver: StatsReceiver = NullStatsReceiver,
private val sourceAddress: Option[String] = None,
- private val clientAddress: Option[String] = None) extends ExceptionReceiver {
+ private val clientAddress: Option[String] = None) extends Monitor {
/**
* Add a modifier to append a client address (i.e. endpoint) to a generated ServiceException.
@@ -117,12 +121,14 @@ sealed case class Reporter(
* See top level comment for this class for more details on performance
* implications.
*/
- def receive(t: Throwable) {
+ def handle(t: Throwable) = {
client.Log(createEntry(t) :: Nil) onSuccess {
case ResultCode.OK => statsReceiver.counter("report_exception_ok").incr()
case ResultCode.TRY_LATER => statsReceiver.counter("report_exception_try_later").incr()
} onFailure {
case e => statsReceiver.counter("report_exception_" + e.toString).incr()
}
+
+ false // did not actually handle
}
}
diff --git a/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala b/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala
index f313bba..fc92b4a 100644
--- a/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala
+++ b/finagle/finagle-exception/src/test/scala/com/twitter/finagle/exception/ReporterSpec.scala
@@ -23,7 +23,7 @@ object ReporterSpec extends Specification with Mockito {
val tse = new TestServiceException("service16", "my cool message")
"log entries to a client once upon receive" in {
- reporter.receive(tse.throwable)
+ reporter.handle(tse.throwable)
there was one(logger).Log(captor.capture())
}
@@ -48,7 +48,7 @@ object ReporterSpec extends Specification with Mockito {
val tse = new TestServiceException("service16", "my cool message", clientAddress = Some(InetAddress.getLocalHost.getHostAddress))
"log entries to a client once upon receive" in {
- reporter.receive(tse.throwable)
+ reporter.handle(tse.throwable)
there was one(logger).Log(captor.capture())
}
@@ -76,7 +76,7 @@ object ReporterSpec extends Specification with Mockito {
val tse = new TestServiceException("service16", "my cool message", clientAddress = Some(InetAddress.getLocalHost.getHostAddress), sourceAddress = Some(socket.getAddress.getHostAddress + ":" + socket.getPort))
"log entries to a client once upon receive" in {
- reporter.receive(tse.throwable)
+ reporter.handle(tse.throwable)
there was one(logger).Log(captor.capture())
}
diff --git a/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala b/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala
index 879267b..174469f 100644
--- a/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala
+++ b/util/util-core/src/main/scala/com/twitter/concurrent/IVar.scala
@@ -1,11 +1,11 @@
package com.twitter.concurrent
-import com.twitter.util.Duration
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import scala.annotation.tailrec
import scala.collection.mutable.Queue
+import com.twitter.util.{Duration, Monitor}
/**
* An IVar is an "I-structured variable". It is a mutable cell that begins empty but can
diff --git a/util/util-core/src/main/scala/com/twitter/util/Future.scala b/util/util-core/src/main/scala/com/twitter/util/Future.scala
index 1731b9f..4d5cb78 100644
--- a/util/util-core/src/main/scala/com/twitter/util/Future.scala
+++ b/util/util-core/src/main/scala/com/twitter/util/Future.scala
@@ -552,6 +552,9 @@ class Promise[A] private[Promise] (
}
}
+ /**
+ * XXX Note: exceptions are monitored.
+ */
override def respond(k: Try[A] => Unit): Future[A] = {
// Note that there's a race here, but that's
// okay. The resulting Futures are
@@ -566,7 +569,7 @@ class Promise[A] private[Promise] (
if (chained eq null)
chained = new Promise(ivar.chained, cancelled)
val next = chained
- respond0(k)
+ respond0 { r => Monitor { k(r) } }
next
}
@@ -634,7 +637,7 @@ class Promise[A] private[Promise] (
override def filter(p: A => Boolean): Future[A] = {
makePromise[A](this) { promise =>
- respond0 { x => promise() = x.filter(p) }
+ respond0 { x => Monitor { promise() = x.filter(p) } }
}
}
diff --git a/util/util-core/src/main/scala/com/twitter/util/Monitor.scala b/util/util-core/src/main/scala/com/twitter/util/Monitor.scala
new file mode 100644
index 0000000..7009991
--- /dev/null
+++ b/util/util-core/src/main/scala/com/twitter/util/Monitor.scala
@@ -0,0 +1,107 @@
+package com.twitter.util
+
+import java.util.logging.{Logger, Level}
+import scala.annotation.tailrec
+
+/**
+ * Wraps an exception that happens when handling another exception in
+ * a monitor.
+ */
+case class MonitorException(
+ handlingExc: Throwable,
+ monitorExc: Throwable
+) extends Exception {
+ override def getMessage =
+ "threw exception \""+monitorExc+"\" while handling "+
+ "another exception \""+handlingExc+"\""
+}
+
+/**
+ * A Monitor is a composable exception handler.
+ */
+trait Monitor { self =>
+ /**
+ * Attempt to handle the exception {{exc}}.
+ *
+ * @return whether the exception was handled by this Monitor
+ */
+ def handle(exc: Throwable): Boolean
+
+ /**
+ * Run {{f}} inside of the monitor context. That is, if the
+ * computation {{f}} throws an exception, the Monitor will attempt
+ * to handle it. If it fails to do so, the exception is propagated
+ * to the caller
+ */
+ def apply(f: => Unit) {
+ try f catch { case exc => if (!handle(exc)) throw exc }
+ }
+
+ def orElse(next: Monitor) = new Monitor {
+ def handle(exc: Throwable) =
+ self.tryHandle(exc) rescue { case exc1 =>
+ next.tryHandle(exc1)
+ } isReturn
+ }
+
+ def andThen(next: Monitor) = new Monitor {
+ def handle(exc: Throwable) =
+ self.tryHandle(exc) match {
+ case Return(()) =>
+ next.tryHandle(exc)
+ true
+ case Throw(exc1) =>
+ next.tryHandle(exc1).isReturn
+ }
+ }
+
+ def Try[T](f: => T): Try[T] = com.twitter.util.Try(f) onFailure { handle(_) }
+
+ protected def tryHandle(exc: Throwable) =
+ com.twitter.util.Try { self.handle(exc) } rescue {
+ case monitorExc => Throw(MonitorException(exc, monitorExc))
+ } flatMap { ok =>
+ if (ok) Return(())
+ else Throw(exc): Try[Boolean]
+ }
+}
+
+object NullMonitor extends Monitor {
+ def handle(exc: Throwable) = false
+}
+
+object Monitor extends Monitor {
+ private[this] val local = new Local[Monitor]
+
+ def get = local() getOrElse NullMonitor
+ def set(m: Monitor) { local() = m }
+
+ @inline
+ def using[T](m: Monitor)(f: => T): T = restore {
+ set(m)
+ f
+ }
+
+ @inline
+ def restore[T](f: => T): T = {
+ val saved = local()
+ try f finally { local.set(saved) }
+ }
+
+ // shoudl we really include the root monitor here?
+
+ def handle(exc: Throwable): Boolean =
+ (get orElse RootMonitor).handle(exc)
+}
+
+object RootMonitor extends Monitor {
+ private[this] val log = Logger.getLogger("monitor")
+
+ def handle(exc: Throwable) = {
+ log.log(Level.SEVERE, "Exception propagated to the root monitor!", exc)
+ /*
+ * We select to call this handled. It's the last resort.
+ */
+ true
+ }
+}
diff --git a/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala b/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala
index 6550bc2..25d6361 100644
--- a/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala
+++ b/util/util-core/src/test/scala/com/twitter/util/FutureSpec.scala
@@ -521,6 +521,21 @@ class FutureSpec extends Specification with Mockito {
f()= Return(1)
wasCalledWith mustEqual Some(1)
}
+
+ "monitor exceptions" in {
+ val m = spy(new MonitorSpec.MockMonitor)
+ val exc = new Exception
+ m.handle(any) returns true
+ val p = new Promise[Int]
+
+ Monitor.using(m) {
+ p ensure { throw exc }
+ }
+
+ there was no(m).handle(any)
+ p.update(Return(1)) mustNot throwA[Throwable]
+ there was one(m).handle(exc)
+ }
}
"Future() handles exceptions" in {
diff --git a/util/util-core/src/test/scala/com/twitter/util/MonitorSpec.scala b/util/util-core/src/test/scala/com/twitter/util/MonitorSpec.scala
new file mode 100644
index 0000000..18c3af8
--- /dev/null
+++ b/util/util-core/src/test/scala/com/twitter/util/MonitorSpec.scala
@@ -0,0 +1,117 @@
+package com.twitter.util
+
+import org.specs.Specification
+import org.specs.mock.Mockito
+import com.twitter.conversions.time._
+import java.util.concurrent.ConcurrentLinkedQueue
+import com.twitter.concurrent.SimpleSetter
+
+object MonitorSpec extends Specification with Mockito {
+ class MockMonitor extends Monitor {
+ def handle(cause: Throwable) = false
+ }
+
+ "Monitor#orElse" should {
+ val m0, m1, m2 = spy(new MockMonitor)
+ Seq(m0, m1, m2) foreach { _.handle(any) returns true }
+ val exc = new Exception
+ val m = m0 orElse m1 orElse m2
+
+ "stop at first successful handle" in {
+ m.handle(exc) must beTrue
+
+ there was one(m0).handle(exc)
+ there was no(m1).handle(exc)
+ there was no(m2).handle(exc)
+
+ m0.handle(any) returns false
+
+ m.handle(exc) must beTrue
+ there were two(m0).handle(exc)
+ there was one(m1).handle(exc)
+ there was no(m2).handle(exc)
+ }
+
+ "fail when no nothing got handled" in {
+ Seq(m0, m1, m2) foreach { _.handle(any) returns false }
+ m.handle(exc) must beFalse
+ Seq(m0, m1, m2) foreach { m => there was one(m).handle(exc) }
+ }
+
+ "wrap Monitor exceptions and pass them on" in {
+ val rte = new RuntimeException("really bad news")
+ m0.handle(any) throws rte
+ m.handle(exc) must beTrue
+ there was one(m0).handle(exc)
+ there was one(m1).handle(MonitorException(exc, rte))
+ }
+ }
+
+ "Monitor#andThen" should {
+ val m0, m1 = spy(new MockMonitor)
+ m0.handle(any) returns true
+ m1.handle(any) returns true
+ val m = m0 andThen m1
+ val exc = new Exception
+
+ "run all monitors" in {
+ m.handle(exc) must beTrue
+ there was one(m0).handle(exc)
+ there was one(m1).handle(exc)
+ }
+
+ "be succcessful when any underlying monitor is" in {
+ m0.handle(any) returns false
+ m.handle(exc) must beTrue
+ m1.handle(any) returns false
+ m.handle(exc) must beFalse
+ }
+
+ "wrap Monitor exceptions and pass them on" in {
+ val rte = new RuntimeException("really bad news")
+ m0.handle(any) throws rte
+ m.handle(exc) must beTrue
+ there was one(m0).handle(exc)
+ there was one(m1).handle(MonitorException(exc, rte))
+ }
+
+ "fail if both monitors throw" in {
+ val rte = new RuntimeException("really bad news")
+ m0.handle(any) throws rte
+ m1.handle(any) throws rte
+ m.handle(exc) must beFalse
+ }
+ }
+
+ "Monitor.get, Monitor.set()" should {
+ val m = spy(new MockMonitor)
+ m.handle(any) returns true
+
+ "maintain current monitor" in Monitor.restore {
+ Monitor.set(m)
+ Monitor.get mustBe m
+ }
+ }
+
+ "Monitor.handle" should {
+ val m = spy(new MockMonitor)
+ m.handle(any) returns true
+
+ "dispatch to current monitor" in Monitor.restore {
+ val exc = new Exception
+ Monitor.set(m)
+ Monitor.handle(exc)
+ there was one(m).handle(exc)
+ }
+ }
+
+ "Monitor.restore" should {
+ "restore current configuration" in {
+ val orig = Monitor.get
+ Monitor.restore {
+ Monitor.set(mock[Monitor])
+ }
+ Monitor.get mustBe orig
+ }
+ }
+}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment