-
-
Save Daenyth/7795133b3471da32d3121fcf30994484 to your computer and use it in GitHub Desktop.
cats-effect dropwizard metrics jumping off point
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics.metrics.dropwizard | |
import com.codahale.metrics.MetricRegistry | |
import nl.grons.metrics4.scala.{ | |
HdrMetricBuilder, | |
InstrumentedBuilder, | |
MetricBuilder, | |
MetricName | |
} | |
import teikametrics.metrics.Metrics | |
import scala.concurrent.duration.FiniteDuration | |
trait Instrumented extends InstrumentedBuilder { | |
override lazy val metricBaseName = instrumentBase() match { | |
case Right(name) => MetricName(name) | |
case Left(clazz) => MetricName(clazz) | |
} | |
protected def instrumentBase(): Either[Class[_], String] | |
// see: https://github.com/erikvanoosten/metrics-scala/blob/master/docs/Hdrhistogram.md#step-2-override-the-metric-builder | |
override lazy protected val metricBuilder = | |
new HdrMetricBuilder(metricBaseName, metricRegistry, resetAtSnapshot = true) | |
override def metrics = new MetricBuilder(metricBaseName, metricRegistry) | |
override val metricRegistry: MetricRegistry = DropwizardMetrics.metricRegistry | |
} | |
object DropwizardMetrics { | |
lazy val metricRegistry = new com.codahale.metrics.MetricRegistry | |
lazy val Instance = new Instrumented with Metrics { | |
override protected val instrumentBase = Right("") | |
override def timer(key: String, duration: FiniteDuration): Unit = | |
metrics.timer(key).update(duration) | |
override protected[metrics] def timerResultSuccess( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String]): Unit = { | |
successKey.foreach(metrics.counter(_).inc()) | |
metrics.timer(key).update(duration) | |
} | |
override protected[metrics] def timerResultFailure(key: String): Unit = | |
errorCounter(key) | |
override def counter(key: String, times: Int): Unit = | |
metrics.counter(key) += times | |
override def histogram(key: String, value: Long): Unit = | |
metrics.histogram(key) += value | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics.metrics | |
import akka.actor.ActorSystem | |
import cats.MonadError | |
import teikametrics.Settings | |
import teikametrics.metrics.dropwizard.DropwizardMetrics | |
import scala.concurrent.duration.FiniteDuration | |
import scala.language.higherKinds | |
trait Metrics { | |
def errorSuffix: String = "_error" | |
def timer(key: String, duration: FiniteDuration): Unit | |
final def timer[T, M[_], E](key: String)(f: => M[T])( | |
implicit monad: MonadError[M, E]): M[T] = | |
timerWithResultCounter[T, M, E](key, (_: T) => None)(f) | |
final def timerWithResultCounter[T, M[_], E]( | |
key: String, | |
counterKey: T => Option[String] | |
)( | |
f: => M[T] | |
)( | |
implicit monad: MonadError[M, E] | |
): M[T] = { | |
val start: Long = System.currentTimeMillis | |
monad.flatMap(monad.attempt(f)) { result => | |
result match { | |
case Right(r) => | |
timerResultSuccess( | |
key, | |
FiniteDuration(System.currentTimeMillis - start, "ms"), | |
counterKey(r)) | |
case Left(_) => | |
timerResultFailure(key) | |
} | |
monad.fromEither(result) | |
} | |
} | |
protected[metrics] def timerResultSuccess( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String] | |
): Unit | |
protected[metrics] def timerResultFailure(key: String): Unit | |
def counter(key: String, times: Int = 1): Unit | |
def errorCounter(key: String, times: Int = 1): Unit = | |
counter(if (key.endsWith(errorSuffix)) key else key + errorSuffix, times) | |
def histogram(key: String, value: Long): Unit | |
} | |
object Metrics { | |
sealed abstract class Implementation(val name: String) | |
case object Dropwizard extends Implementation("dropwizard") | |
case object InMemory extends Implementation("in-memory") | |
case object NoOp extends Implementation("noop") | |
object Implementation { | |
def apply(name: String): Implementation = name.trim.toLowerCase match { | |
case Dropwizard.name => Dropwizard | |
case InMemory.name => InMemory | |
case NoOp.name => NoOp | |
case _ => | |
throw new RuntimeException(s"Unknown metrics implementation: $name") | |
} | |
} | |
def apply(prefix: String = "")(implicit system: ActorSystem): Metrics = | |
Settings(system).metricsImplementation match { | |
case Dropwizard => | |
new MetricsNameModifier(DropwizardMetrics.Instance, Some(prefix)) | |
case InMemory => new MetricsNameModifier(InMemoryMetrics, Some(prefix)) | |
case NoOp => NoOpMetrics | |
} | |
def test(prefix: String = ""): Metrics = | |
new MetricsNameModifier(InMemoryMetrics, Some(prefix)) | |
implicit class MetricsUtils(val metrics: Metrics) extends AnyVal { | |
def withPrefix(prefix: String): Metrics = | |
new MetricsNameModifier(metrics, Some(prefix)) | |
} | |
private class MetricsNameModifier( | |
backend: Metrics, | |
override protected val rawPrefix: Option[String] = None | |
) extends Metrics with MetricsPrefixSupport { | |
override protected[metrics] def timerResultSuccess( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String]): Unit = | |
backend.timerResultSuccess(safePrefix(key), | |
duration, | |
successKey.map(safePrefix)) | |
override protected[metrics] def timerResultFailure(key: String): Unit = | |
backend.timerResultFailure(safePrefix(key)) | |
override def timer(key: String, duration: FiniteDuration): Unit = | |
backend.timer(safePrefix(key), duration) | |
override def counter(key: String, times: Int): Unit = | |
backend.counter(safePrefix(key), times) | |
override def histogram(key: String, value: Long): Unit = | |
backend.histogram(safePrefix(key), value) | |
} | |
} | |
object InMemoryMetrics extends Metrics { | |
import java.util.concurrent.atomic.AtomicLong | |
import scala.collection._ | |
// last-value only for simplicity | |
val timerData: concurrent.Map[String, FiniteDuration] = | |
new concurrent.TrieMap[String, FiniteDuration] | |
val counterData: concurrent.Map[String, AtomicLong] = | |
new concurrent.TrieMap[String, AtomicLong] | |
// last-value only for simplicity | |
val histogramData: concurrent.Map[String, Long] = | |
new concurrent.TrieMap[String, Long] | |
override protected[metrics] def timerResultSuccess( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String]): Unit = { | |
successKey.foreach( | |
counterData.getOrElseUpdate(_, new AtomicLong).incrementAndGet) | |
timerData += key -> duration | |
} | |
override protected[metrics] def timerResultFailure(key: String): Unit = | |
errorCounter(key) | |
override def timer(key: String, duration: FiniteDuration): Unit = | |
timerData += key -> duration | |
override def counter(key: String, times: Int): Unit = { | |
counterData.getOrElseUpdate(key, new AtomicLong).addAndGet(times.toLong) | |
() | |
} | |
override def histogram(key: String, value: Long): Unit = | |
histogramData += key -> value | |
def reset(): Unit = { | |
timerData.clear | |
counterData.clear | |
histogramData.clear | |
} | |
} | |
object NoOpMetrics extends Metrics { | |
override protected[metrics] def timerResultSuccess( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String]): Unit = () | |
override protected[metrics] def timerResultFailure(key: String): Unit = () | |
override def timer(key: String, duration: FiniteDuration): Unit = () | |
override def counter(key: String, times: Int): Unit = () | |
override def histogram(key: String, value: Long): Unit = () | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics.metrics | |
import akka.actor.ActorSystem | |
import cats.effect.Sync | |
import scala.concurrent.duration.FiniteDuration | |
import cats.implicits._ | |
object MetricsF { | |
// Return in F because creating Metrics is shared state | |
def apply[F[_]: Sync](prefix: String = "")( | |
implicit system: ActorSystem): F[MetricsF] = | |
Sync[F].delay(new UnsafeMetricsWrapper(Metrics(prefix))) | |
def fromMetrics(m: Metrics): MetricsF = | |
new UnsafeMetricsWrapper(m) | |
} | |
trait MetricsF { | |
final def timer[F[_]: Sync, A](key: String)(f: F[A]): F[A] = | |
timerWithDuration(key)(f).map(_._2) | |
final def timerWithDuration[F[_]: Sync, A](key: String)( | |
f: F[A]): F[(FiniteDuration, A)] = | |
timerWithResultCounter[F, A](key, _ => None)(f) | |
final def timerWithResultCounter[F[_]: Sync, A]( | |
key: String, | |
counterKey: A => Option[String] | |
)( | |
fa: F[A] | |
): F[(FiniteDuration, A)] = | |
for { | |
start <- Sync[F].delay(System.currentTimeMillis) | |
attempt <- fa.attempt | |
end <- Sync[F].delay(System.currentTimeMillis - start) | |
duration = FiniteDuration(end, "ms") | |
_ <- attempt match { | |
case Right(r) => | |
timerResultSuccess( | |
key, | |
duration, | |
counterKey(r) | |
) | |
case Left(_) => | |
timerResultFailure(key) | |
} | |
result <- Sync[F].fromEither(attempt) | |
} yield duration -> result | |
protected[metrics] def timerResultSuccess[F[_]: Sync]( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String] | |
): F[Unit] | |
protected[metrics] def timerResultFailure[F[_]: Sync](key: String): F[Unit] | |
def timer[F[_]: Sync](key: String, duration: FiniteDuration): F[Unit] | |
def counter[F[_]: Sync](key: String, times: Int = 1): F[Unit] | |
def errorCounter[F[_]: Sync](key: String, times: Int = 1): F[Unit] | |
def histogram[F[_]: Sync](key: String, value: Long): F[Unit] | |
final def withPrefix(prefix: String): MetricsF = | |
new MetricsFNameModifier(this, Some(prefix)) | |
} | |
class UnsafeMetricsWrapper private[metrics] (m: Metrics) extends MetricsF { | |
protected[metrics] def timerResultSuccess[F[_]: Sync]( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String] | |
): F[Unit] = | |
Sync[F].delay(m.timerResultSuccess(key, duration, successKey)) | |
protected[metrics] def timerResultFailure[F[_]: Sync](key: String): F[Unit] = | |
Sync[F].delay(m.timerResultFailure(key)) | |
def timer[F[_]: Sync](key: String, duration: FiniteDuration): F[Unit] = | |
Sync[F].delay(m.timer(key, duration)) | |
def counter[F[_]: Sync](key: String, times: Int = 1): F[Unit] = | |
Sync[F].delay(m.counter(key, times)) | |
def errorCounter[F[_]: Sync](key: String, times: Int = 1): F[Unit] = | |
Sync[F].delay(m.errorCounter(key, times)) | |
def histogram[F[_]: Sync](key: String, value: Long): F[Unit] = | |
Sync[F].delay(m.histogram(key, value)) | |
} | |
/** Exists only for code reuse between the MetricsF and impure Metrics prefix classes */ | |
private[metrics] trait MetricsPrefixSupport { | |
protected def rawPrefix: Option[String] | |
protected final def safePrefix(value: String): String = { | |
val sanitized = sanitizeKey(value) | |
if (sanitized.startsWith(prefix)) sanitized else prefix + sanitized | |
} | |
private def dropRightWhile(from: String)(p: Char => Boolean): String = | |
from match { | |
case s if s.isEmpty => s | |
case s if p(s.last) => dropRightWhile(s.take(s.length - 1))(p) | |
case _ => from | |
} | |
private def sanitizeKey(key: String): String = | |
dropRightWhile(key.trim.dropWhile(_ == '.'))(_ == '.') | |
private val prefix = rawPrefix | |
.map(_.trim) | |
.filter(!_.isEmpty) | |
.map(sanitizeKey(_) + ".") | |
.getOrElse("") | |
} | |
private class MetricsFNameModifier( | |
backend: MetricsF, | |
override protected val rawPrefix: Option[String] = None | |
) extends MetricsF with MetricsPrefixSupport { | |
override protected[metrics] def timerResultSuccess[F[_]: Sync]( | |
key: String, | |
duration: FiniteDuration, | |
successKey: Option[String]): F[Unit] = | |
backend.timerResultSuccess(safePrefix(key), | |
duration, | |
successKey.map(safePrefix)) | |
override protected[metrics] def timerResultFailure[F[_]: Sync]( | |
key: String): F[Unit] = | |
backend.timerResultFailure(safePrefix(key)) | |
override def timer[F[_]: Sync](key: String, | |
duration: FiniteDuration): F[Unit] = | |
backend.timer(safePrefix(key), duration) | |
override def counter[F[_]: Sync](key: String, times: Int): F[Unit] = | |
backend.counter(safePrefix(key), times) | |
override def histogram[F[_]: Sync](key: String, value: Long): F[Unit] = | |
backend.histogram(safePrefix(key), value) | |
override def errorCounter[F[_]: Sync](key: String, times: Int): F[Unit] = | |
backend.errorCounter(safePrefix(key), times) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment