Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created July 20, 2018 21:07
Show Gist options
  • Save Daenyth/7795133b3471da32d3121fcf30994484 to your computer and use it in GitHub Desktop.
Save Daenyth/7795133b3471da32d3121fcf30994484 to your computer and use it in GitHub Desktop.
cats-effect dropwizard metrics jumping off point
package teikametrics.metrics.dropwizard
import com.codahale.metrics.MetricRegistry
import nl.grons.metrics4.scala.{
HdrMetricBuilder,
InstrumentedBuilder,
MetricBuilder,
MetricName
}
import teikametrics.metrics.Metrics
import scala.concurrent.duration.FiniteDuration
trait Instrumented extends InstrumentedBuilder {
override lazy val metricBaseName = instrumentBase() match {
case Right(name) => MetricName(name)
case Left(clazz) => MetricName(clazz)
}
protected def instrumentBase(): Either[Class[_], String]
// see: https://github.com/erikvanoosten/metrics-scala/blob/master/docs/Hdrhistogram.md#step-2-override-the-metric-builder
override lazy protected val metricBuilder =
new HdrMetricBuilder(metricBaseName, metricRegistry, resetAtSnapshot = true)
override def metrics = new MetricBuilder(metricBaseName, metricRegistry)
override val metricRegistry: MetricRegistry = DropwizardMetrics.metricRegistry
}
object DropwizardMetrics {
lazy val metricRegistry = new com.codahale.metrics.MetricRegistry
lazy val Instance = new Instrumented with Metrics {
override protected val instrumentBase = Right("")
override def timer(key: String, duration: FiniteDuration): Unit =
metrics.timer(key).update(duration)
override protected[metrics] def timerResultSuccess(
key: String,
duration: FiniteDuration,
successKey: Option[String]): Unit = {
successKey.foreach(metrics.counter(_).inc())
metrics.timer(key).update(duration)
}
override protected[metrics] def timerResultFailure(key: String): Unit =
errorCounter(key)
override def counter(key: String, times: Int): Unit =
metrics.counter(key) += times
override def histogram(key: String, value: Long): Unit =
metrics.histogram(key) += value
}
}
package teikametrics.metrics
import akka.actor.ActorSystem
import cats.MonadError
import teikametrics.Settings
import teikametrics.metrics.dropwizard.DropwizardMetrics
import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
trait Metrics {
def errorSuffix: String = "_error"
def timer(key: String, duration: FiniteDuration): Unit
final def timer[T, M[_], E](key: String)(f: => M[T])(
implicit monad: MonadError[M, E]): M[T] =
timerWithResultCounter[T, M, E](key, (_: T) => None)(f)
final def timerWithResultCounter[T, M[_], E](
key: String,
counterKey: T => Option[String]
)(
f: => M[T]
)(
implicit monad: MonadError[M, E]
): M[T] = {
val start: Long = System.currentTimeMillis
monad.flatMap(monad.attempt(f)) { result =>
result match {
case Right(r) =>
timerResultSuccess(
key,
FiniteDuration(System.currentTimeMillis - start, "ms"),
counterKey(r))
case Left(_) =>
timerResultFailure(key)
}
monad.fromEither(result)
}
}
protected[metrics] def timerResultSuccess(
key: String,
duration: FiniteDuration,
successKey: Option[String]
): Unit
protected[metrics] def timerResultFailure(key: String): Unit
def counter(key: String, times: Int = 1): Unit
def errorCounter(key: String, times: Int = 1): Unit =
counter(if (key.endsWith(errorSuffix)) key else key + errorSuffix, times)
def histogram(key: String, value: Long): Unit
}
object Metrics {
sealed abstract class Implementation(val name: String)
case object Dropwizard extends Implementation("dropwizard")
case object InMemory extends Implementation("in-memory")
case object NoOp extends Implementation("noop")
object Implementation {
def apply(name: String): Implementation = name.trim.toLowerCase match {
case Dropwizard.name => Dropwizard
case InMemory.name => InMemory
case NoOp.name => NoOp
case _ =>
throw new RuntimeException(s"Unknown metrics implementation: $name")
}
}
def apply(prefix: String = "")(implicit system: ActorSystem): Metrics =
Settings(system).metricsImplementation match {
case Dropwizard =>
new MetricsNameModifier(DropwizardMetrics.Instance, Some(prefix))
case InMemory => new MetricsNameModifier(InMemoryMetrics, Some(prefix))
case NoOp => NoOpMetrics
}
def test(prefix: String = ""): Metrics =
new MetricsNameModifier(InMemoryMetrics, Some(prefix))
implicit class MetricsUtils(val metrics: Metrics) extends AnyVal {
def withPrefix(prefix: String): Metrics =
new MetricsNameModifier(metrics, Some(prefix))
}
private class MetricsNameModifier(
backend: Metrics,
override protected val rawPrefix: Option[String] = None
) extends Metrics with MetricsPrefixSupport {
override protected[metrics] def timerResultSuccess(
key: String,
duration: FiniteDuration,
successKey: Option[String]): Unit =
backend.timerResultSuccess(safePrefix(key),
duration,
successKey.map(safePrefix))
override protected[metrics] def timerResultFailure(key: String): Unit =
backend.timerResultFailure(safePrefix(key))
override def timer(key: String, duration: FiniteDuration): Unit =
backend.timer(safePrefix(key), duration)
override def counter(key: String, times: Int): Unit =
backend.counter(safePrefix(key), times)
override def histogram(key: String, value: Long): Unit =
backend.histogram(safePrefix(key), value)
}
}
object InMemoryMetrics extends Metrics {
import java.util.concurrent.atomic.AtomicLong
import scala.collection._
// last-value only for simplicity
val timerData: concurrent.Map[String, FiniteDuration] =
new concurrent.TrieMap[String, FiniteDuration]
val counterData: concurrent.Map[String, AtomicLong] =
new concurrent.TrieMap[String, AtomicLong]
// last-value only for simplicity
val histogramData: concurrent.Map[String, Long] =
new concurrent.TrieMap[String, Long]
override protected[metrics] def timerResultSuccess(
key: String,
duration: FiniteDuration,
successKey: Option[String]): Unit = {
successKey.foreach(
counterData.getOrElseUpdate(_, new AtomicLong).incrementAndGet)
timerData += key -> duration
}
override protected[metrics] def timerResultFailure(key: String): Unit =
errorCounter(key)
override def timer(key: String, duration: FiniteDuration): Unit =
timerData += key -> duration
override def counter(key: String, times: Int): Unit = {
counterData.getOrElseUpdate(key, new AtomicLong).addAndGet(times.toLong)
()
}
override def histogram(key: String, value: Long): Unit =
histogramData += key -> value
def reset(): Unit = {
timerData.clear
counterData.clear
histogramData.clear
}
}
object NoOpMetrics extends Metrics {
override protected[metrics] def timerResultSuccess(
key: String,
duration: FiniteDuration,
successKey: Option[String]): Unit = ()
override protected[metrics] def timerResultFailure(key: String): Unit = ()
override def timer(key: String, duration: FiniteDuration): Unit = ()
override def counter(key: String, times: Int): Unit = ()
override def histogram(key: String, value: Long): Unit = ()
}
package teikametrics.metrics
import akka.actor.ActorSystem
import cats.effect.Sync
import scala.concurrent.duration.FiniteDuration
import cats.implicits._
object MetricsF {
// Return in F because creating Metrics is shared state
def apply[F[_]: Sync](prefix: String = "")(
implicit system: ActorSystem): F[MetricsF] =
Sync[F].delay(new UnsafeMetricsWrapper(Metrics(prefix)))
def fromMetrics(m: Metrics): MetricsF =
new UnsafeMetricsWrapper(m)
}
trait MetricsF {
final def timer[F[_]: Sync, A](key: String)(f: F[A]): F[A] =
timerWithDuration(key)(f).map(_._2)
final def timerWithDuration[F[_]: Sync, A](key: String)(
f: F[A]): F[(FiniteDuration, A)] =
timerWithResultCounter[F, A](key, _ => None)(f)
final def timerWithResultCounter[F[_]: Sync, A](
key: String,
counterKey: A => Option[String]
)(
fa: F[A]
): F[(FiniteDuration, A)] =
for {
start <- Sync[F].delay(System.currentTimeMillis)
attempt <- fa.attempt
end <- Sync[F].delay(System.currentTimeMillis - start)
duration = FiniteDuration(end, "ms")
_ <- attempt match {
case Right(r) =>
timerResultSuccess(
key,
duration,
counterKey(r)
)
case Left(_) =>
timerResultFailure(key)
}
result <- Sync[F].fromEither(attempt)
} yield duration -> result
protected[metrics] def timerResultSuccess[F[_]: Sync](
key: String,
duration: FiniteDuration,
successKey: Option[String]
): F[Unit]
protected[metrics] def timerResultFailure[F[_]: Sync](key: String): F[Unit]
def timer[F[_]: Sync](key: String, duration: FiniteDuration): F[Unit]
def counter[F[_]: Sync](key: String, times: Int = 1): F[Unit]
def errorCounter[F[_]: Sync](key: String, times: Int = 1): F[Unit]
def histogram[F[_]: Sync](key: String, value: Long): F[Unit]
final def withPrefix(prefix: String): MetricsF =
new MetricsFNameModifier(this, Some(prefix))
}
class UnsafeMetricsWrapper private[metrics] (m: Metrics) extends MetricsF {
protected[metrics] def timerResultSuccess[F[_]: Sync](
key: String,
duration: FiniteDuration,
successKey: Option[String]
): F[Unit] =
Sync[F].delay(m.timerResultSuccess(key, duration, successKey))
protected[metrics] def timerResultFailure[F[_]: Sync](key: String): F[Unit] =
Sync[F].delay(m.timerResultFailure(key))
def timer[F[_]: Sync](key: String, duration: FiniteDuration): F[Unit] =
Sync[F].delay(m.timer(key, duration))
def counter[F[_]: Sync](key: String, times: Int = 1): F[Unit] =
Sync[F].delay(m.counter(key, times))
def errorCounter[F[_]: Sync](key: String, times: Int = 1): F[Unit] =
Sync[F].delay(m.errorCounter(key, times))
def histogram[F[_]: Sync](key: String, value: Long): F[Unit] =
Sync[F].delay(m.histogram(key, value))
}
/** Exists only for code reuse between the MetricsF and impure Metrics prefix classes */
private[metrics] trait MetricsPrefixSupport {
protected def rawPrefix: Option[String]
protected final def safePrefix(value: String): String = {
val sanitized = sanitizeKey(value)
if (sanitized.startsWith(prefix)) sanitized else prefix + sanitized
}
private def dropRightWhile(from: String)(p: Char => Boolean): String =
from match {
case s if s.isEmpty => s
case s if p(s.last) => dropRightWhile(s.take(s.length - 1))(p)
case _ => from
}
private def sanitizeKey(key: String): String =
dropRightWhile(key.trim.dropWhile(_ == '.'))(_ == '.')
private val prefix = rawPrefix
.map(_.trim)
.filter(!_.isEmpty)
.map(sanitizeKey(_) + ".")
.getOrElse("")
}
private class MetricsFNameModifier(
backend: MetricsF,
override protected val rawPrefix: Option[String] = None
) extends MetricsF with MetricsPrefixSupport {
override protected[metrics] def timerResultSuccess[F[_]: Sync](
key: String,
duration: FiniteDuration,
successKey: Option[String]): F[Unit] =
backend.timerResultSuccess(safePrefix(key),
duration,
successKey.map(safePrefix))
override protected[metrics] def timerResultFailure[F[_]: Sync](
key: String): F[Unit] =
backend.timerResultFailure(safePrefix(key))
override def timer[F[_]: Sync](key: String,
duration: FiniteDuration): F[Unit] =
backend.timer(safePrefix(key), duration)
override def counter[F[_]: Sync](key: String, times: Int): F[Unit] =
backend.counter(safePrefix(key), times)
override def histogram[F[_]: Sync](key: String, value: Long): F[Unit] =
backend.histogram(safePrefix(key), value)
override def errorCounter[F[_]: Sync](key: String, times: Int): F[Unit] =
backend.errorCounter(safePrefix(key), times)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment