Created
August 27, 2023 07:18
-
-
Save nzpr/2f9f9a67bec0fd3551841f2ec7d72124 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.kernel.{Outcome, Resource} | |
import cats.effect.unsafe.implicits.global | |
import cats.effect.{IO, Sync} | |
import cats.syntax.all.* | |
import com.typesafe.config.{Config, ConfigValueFactory} | |
import kamon.Kamon | |
import kamon.context.Storage.Scope | |
import kamon.trace.Span | |
import org.scalatest.flatspec.AnyFlatSpec | |
import org.scalatest.matchers.should.Matchers | |
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime | |
import scala.concurrent.Await | |
class KamonTest extends AnyFlatSpec with Matchers { | |
/** Resource ensuring that Kamon is initialized and stopped properly. */ | |
def kamonResource[F[_]: Sync](configOpt: Option[Config] = None): Resource[F, Unit] = { | |
val stopTimeout = 2.second | |
val start = Sync[F].delay(configOpt.map(Kamon.init).getOrElse(Kamon.init())) | |
val stop = Sync[F].delay(Await.result(Kamon.stop(), stopTimeout)) | |
Resource.make(start)(_ => stop) | |
} | |
// Adaptation of Kamon's function span(operationName: String, component: String)(f: => A): A to cats-effect | |
def span[F[_]: Sync, A](opName: String, component: String)(f: F[A]) = { | |
val openSpan = Sync[F].delay { | |
val span = Kamon.internalSpanBuilder(opName, component).start() // start span | |
val ctx = Kamon.currentContext().withEntry(Span.Key, span) // make context with span | |
val scope = Kamon.storeContext(ctx) // store context | |
(span, scope) | |
} | |
def closeSpan(span: Span, scope: Scope) = Sync[F].delay { | |
span.finish() // finish span | |
scope.close() // close context | |
} | |
// open span, do work, ensure span is finished and context is closed | |
Sync[F].bracketCase(openSpan)(_ => f) { | |
case (span, scope) -> Outcome.Errored(err) => closeSpan(span.fail(err.getLocalizedMessage, err), scope) | |
case (span, scope) -> _ => closeSpan(span, scope) | |
} | |
} | |
/** | |
* Recurse n times, each time with a new span | |
* @param n Recursion depth | |
* @param recId id of recursion (to easily identify the span in Jaeger/Zipkin) | |
* @return | |
*/ | |
def recWithSpan(n: Int, recId: Int): IO[Unit] = { | |
val rec = IO.defer { | |
val work = IO.sleep(10.millis) // simulate some work | |
val recurse = recWithSpan(n - 1, recId).whenA(n > 0) // recurse if n > 0 | |
work >> recurse | |
} | |
span(s"recId $recId step $n ", "recursion")(rec) | |
} | |
// To understand all these configurations look into reference.conf files in Kamon packages | |
val cfg = Kamon | |
.config() | |
// No need for influxdb reporter | |
.withValue("kamon.modules.influxdb.enabled", ConfigValueFactory.fromAnyRef(false)) | |
// okHttp instrumentation fills Jaeger with unwanted spans, so disable | |
.withValue("kamon.instrumentation.okhttp.http-client.tracing.enabled", ConfigValueFactory.fromAnyRef(false)) | |
// by default Kamon apply sampling for traces and do not report each and every trace. Switch to always mode. | |
.withValue("kamon.trace.sampler", ConfigValueFactory.fromAnyRef("always")) | |
// number of tasks. Kamon should report this number of traces. | |
val numTasks = 5 | |
// Each trace should contain this number of nested spans. | |
val nestingLvl = 9 | |
"Kamon" should "report all spans and arrange in traces properly" in { | |
KamonDiagnostics | |
.kamonResource[IO](cfg.some) | |
.surround { | |
// list of tasks to run | |
val tasks = (1 to numTasks).map(recWithSpan(nestingLvl, _)).toList | |
// Run tasks concurrently to check that context is propagated correctly. | |
tasks.parSequence.void | |
} | |
.unsafeRunSync() shouldBe () | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment