Skip to content

Instantly share code, notes, and snippets.

@daggerrz
Created March 2, 2014 17:41
Show Gist options
  • Save daggerrz/9310425 to your computer and use it in GitHub Desktop.
Save daggerrz/9310425 to your computer and use it in GitHub Desktop.
GraphiteReporter
package com.tapad.common.metrics
import java.net.Socket
import java.io._
import com.tapad.common.log.Logging
import com.twitter.ostrich.stats._
import java.util.TimerTask
object GraphiteReporter {
def apply(prefix: String, host: String, port: Int, reportInterval: Long) = {
val reporter = new GraphiteReporter(prefix, host, port, reportInterval)
reporter.add(Stats)
}
}
class GraphiteReporter(prefix: String, host: String, port: Int, reportInterval: Long) extends Logging {
private val state = new GraphiteReporterState(prefix, host, port)
new java.util.Timer("GraphiteReporter").schedule(new TimerTask {
def run() {
try {
log.debug("Flushing stats to Graphite.")
state.flush()
log.debug("Graphite flush completed.")
} catch {
case t: Throwable => log.warn("Error flushing. Data will be retained for next flush...", t)
}
}
}, reportInterval, reportInterval)
/**
* Adds a stats collection to the monitoring queue.
*/
def add(stats: StatsCollection) { state add stats }
}
/**
* Reporting worker thread.
*/
private[metrics] class GraphiteReporterState(prefix: String, host: String, port: Int) extends Logging {
private var socket: Socket = _
private var writer: OutputStreamWriter = _
private var collections: Set[StatsCollection] = Set()
private def connect() {
synchronized {
if (socket == null || !socket.isConnected) {
log.info("Connecting to {}...", host + ":" + port)
socket = new Socket(host, port)
writer = new OutputStreamWriter(socket.getOutputStream)
log.info("Connected.")
}
}
}
private def disconnect() {
synchronized {
log.info("Disconnecting...")
if (socket != null && !socket.isClosed) socket.close()
socket = null
log.info("Disconnected.")
}
}
def flush() {
synchronized {
connect()
try {
val now = System.currentTimeMillis
collections.foreach { coll =>
Graphite.writeGraphiteMsg(prefix, now, coll, writer)
coll.clearAll()
}
} catch {
case e: IOException =>
disconnect()
throw e
}
}
}
def add(s: StatsCollection) {
synchronized {
collections += s
}
}
}
object Graphite extends Logging {
/**
* Convert a long value into a Graphite line.
*/
private def toGraphiteLine(name: String, value: Long, epoch: Long) = {
name + " " + value + " " + epoch
}
/**
* Convert a double value into a Graphite line.
*/
private def toGraphiteLine(name: String, value: Double, epoch: Long)(implicit df: java.text.DecimalFormat) = {
name + " " + df.format(value) + " " + epoch
}
/**
* Write a stats collection to a Graphite writer.
*/
def writeGraphiteMsg(prefix: String, timestamp: Long, stats: StatsCollection, out: Writer) {
implicit val df = new java.text.DecimalFormat("#0.00")
val epoch = timestamp / 1000
val report = stats.get()
val metrics = report.metrics.toList.flatMap {
case (name, distribution) =>
val values =
List(0.90, 0.95, 0.99).map(p => "p" + (p * 100).toInt -> distribution.histogram.getPercentile(p).toLong) ++
List("count" -> distribution.histogram.count, "average" -> distribution.average.toLong)
values.toMap.toList.map {
case (metric, value) =>
toGraphiteLine(prefix + "." + name + "_" + metric, value, epoch)
}
}
val counters = report.counters.toList.map {
case (name, value) =>
toGraphiteLine(prefix + "." + name, value, epoch)
}
val gauges = report.gauges.toList.map {
case (name, value) =>
toGraphiteLine(prefix + "." + name, value, epoch)
}
(metrics ++ counters ++ gauges).foreach {
line =>
log.debug(line)
out.write(line)
out.write('\n')
out.flush()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment