Skip to content

Instantly share code, notes, and snippets.

@bancek
Created July 18, 2014 15:46
Show Gist options
  • Save bancek/c0be8a716ef3144dcd5b to your computer and use it in GitHub Desktop.
Save bancek/c0be8a716ef3144dcd5b to your computer and use it in GitHub Desktop.
Finagle Thrift Statsd filter
package net.koofr.finagle.thrift.stats
import com.twitter.finagle.SimpleFilter
import org.apache.thrift.protocol.TBinaryProtocol
import com.twitter.finagle.Service
import org.apache.thrift.transport.TMemoryInputTransport
import play.modules.statsd.api.Statsd
import com.twitter.finagle.thrift.ThriftClientRequest
import play.api.Logger
// Usage:
//
// val stats = new ClientThriftStats(serviceName)
// val serviceWithStats = stats andThen service
abstract class ThriftStats[T](typ: String, serviceName: String) extends SimpleFilter[T, Array[Byte]] {
val factory = new TBinaryProtocol.Factory()
def getData(request: T): Array[Byte]
def apply(request: T, service: Service[T, Array[Byte]]) = {
val start = System.currentTimeMillis()
val transport = new TMemoryInputTransport(getData(request))
val protocol = new TBinaryProtocol.Factory().getProtocol(transport)
val msg = protocol.readMessageBegin()
val nameLower = msg.name.toLowerCase
val keys = Seq(
s"thrift.${typ}.${serviceName}.${nameLower}.",
s"thrift.${typ}.${serviceName}.all.",
s"thrift.${typ}.all."
)
keys.map(_ + "request") foreach (Statsd.increment(_))
Logger.trace(s"Thrift ${serviceName} ${typ} ${nameLower} request")
val resp = service(request)
resp respond { respTry =>
val time = System.currentTimeMillis() - start
keys.map(_ + "time") foreach (Statsd.timing(_, time))
keys.map(_ + "response.done") foreach (Statsd.increment(_))
respTry match {
case com.twitter.util.Return(r) =>
keys.map(_ + "response.success") foreach (Statsd.increment(_))
Logger.trace(s"Thrift ${serviceName} ${typ} ${nameLower}: success (${time} ms)")
case com.twitter.util.Throw(e) =>
keys.map(_ + "response.error") foreach (Statsd.increment(_))
Logger.trace(s"Thrift ${serviceName} ${typ} ${nameLower}: error (${time} ms)")
}
}
resp
}
}
class ServerThriftStats(serviceName: String) extends ThriftStats[Array[Byte]]("server", serviceName) {
def getData(request: Array[Byte]) = request
}
class ClientThriftStats(serviceName: String) extends ThriftStats[ThriftClientRequest]("client", serviceName) {
def getData(request: ThriftClientRequest) = request.message
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment