Skip to content

Instantly share code, notes, and snippets.

@nzpr
Created August 27, 2023 07:18
Show Gist options
  • Save nzpr/2f9f9a67bec0fd3551841f2ec7d72124 to your computer and use it in GitHub Desktop.
Save nzpr/2f9f9a67bec0fd3551841f2ec7d72124 to your computer and use it in GitHub Desktop.
import cats.effect.kernel.{Outcome, Resource}
import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Sync}
import cats.syntax.all.*
import com.typesafe.config.{Config, ConfigValueFactory}
import kamon.Kamon
import kamon.context.Storage.Scope
import kamon.trace.Span
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
import scala.concurrent.Await
class KamonTest extends AnyFlatSpec with Matchers {
/** Resource ensuring that Kamon is initialized and stopped properly. */
def kamonResource[F[_]: Sync](configOpt: Option[Config] = None): Resource[F, Unit] = {
val stopTimeout = 2.second
val start = Sync[F].delay(configOpt.map(Kamon.init).getOrElse(Kamon.init()))
val stop = Sync[F].delay(Await.result(Kamon.stop(), stopTimeout))
Resource.make(start)(_ => stop)
}
// Adaptation of Kamon's function span(operationName: String, component: String)(f: => A): A to cats-effect
def span[F[_]: Sync, A](opName: String, component: String)(f: F[A]) = {
val openSpan = Sync[F].delay {
val span = Kamon.internalSpanBuilder(opName, component).start() // start span
val ctx = Kamon.currentContext().withEntry(Span.Key, span) // make context with span
val scope = Kamon.storeContext(ctx) // store context
(span, scope)
}
def closeSpan(span: Span, scope: Scope) = Sync[F].delay {
span.finish() // finish span
scope.close() // close context
}
// open span, do work, ensure span is finished and context is closed
Sync[F].bracketCase(openSpan)(_ => f) {
case (span, scope) -> Outcome.Errored(err) => closeSpan(span.fail(err.getLocalizedMessage, err), scope)
case (span, scope) -> _ => closeSpan(span, scope)
}
}
/**
* Recurse n times, each time with a new span
* @param n Recursion depth
* @param recId id of recursion (to easily identify the span in Jaeger/Zipkin)
* @return
*/
def recWithSpan(n: Int, recId: Int): IO[Unit] = {
val rec = IO.defer {
val work = IO.sleep(10.millis) // simulate some work
val recurse = recWithSpan(n - 1, recId).whenA(n > 0) // recurse if n > 0
work >> recurse
}
span(s"recId $recId step $n ", "recursion")(rec)
}
// To understand all these configurations look into reference.conf files in Kamon packages
val cfg = Kamon
.config()
// No need for influxdb reporter
.withValue("kamon.modules.influxdb.enabled", ConfigValueFactory.fromAnyRef(false))
// okHttp instrumentation fills Jaeger with unwanted spans, so disable
.withValue("kamon.instrumentation.okhttp.http-client.tracing.enabled", ConfigValueFactory.fromAnyRef(false))
// by default Kamon apply sampling for traces and do not report each and every trace. Switch to always mode.
.withValue("kamon.trace.sampler", ConfigValueFactory.fromAnyRef("always"))
// number of tasks. Kamon should report this number of traces.
val numTasks = 5
// Each trace should contain this number of nested spans.
val nestingLvl = 9
"Kamon" should "report all spans and arrange in traces properly" in {
KamonDiagnostics
.kamonResource[IO](cfg.some)
.surround {
// list of tasks to run
val tasks = (1 to numTasks).map(recWithSpan(nestingLvl, _)).toList
// Run tasks concurrently to check that context is propagated correctly.
tasks.parSequence.void
}
.unsafeRunSync() shouldBe ()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment