Skip to content

Instantly share code, notes, and snippets.

@ndchandar
Last active June 14, 2021 05:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ndchandar/0c54f348a72308d3abb1741f311c650c to your computer and use it in GitHub Desktop.
Save ndchandar/0c54f348a72308d3abb1741f311c650c to your computer and use it in GitHub Desktop.
Context Switching Issues With Kamon 2.2.0
package com.mycompany
import cats.effect.implicits._
import cats.effect.{Async, Sync}
import cats.implicits._
import kamon.Kamon
import java.util.concurrent.{CompletionException, CompletionStage}
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Left, Right}
object IOUtils {
def fromCompletionStage[F[_], A](makeCf: => CompletionStage[A])(implicit F: Async[F]): F[A] =
KamonUtils.withContext(Kamon.currentContext()).use { _ =>
F.async_[A] { cb =>
makeCf.handle[Unit](
(result: A, err: Throwable) =>
err match {
case null =>
cb(Right(result))
case ex: CompletionException if ex.getCause ne null =>
cb(Left(ex.getCause))
case ex =>
cb(Left(ex))
}
)
}
}
def fromFuture[F[_], A](fut: => Future[A])(implicit F: Async[F]): F[A] =
KamonUtils.withContext(Kamon.currentContext()).use(_ => F.fromFuture(F.delay(fut)))
}
package com.mycompany
import cats.effect.kernel.Async
import cats.effect.{Resource, Spawn, Sync}
import cats.syntax.all._
import kamon.Kamon
import kamon.context.Context
import kamon.trace.{Span, SpanBuilder}
object KamonUtils {
def withSpan[F[_]](spanBuilder: SpanBuilder)(implicit F: Sync[F]): Resource[F, Unit] =
Resource
.make(F.delay {
val span = spanBuilder.start()
val scope = Kamon.storeContext(Kamon.currentContext().withEntry(Span.Key, span).withTags(spanBuilder.tags()))
(scope, span)
}) {
case (scope, span) =>
F.delay {
scope.close()
span.finish()
}
}.void
def withContext[F[_]](context: Context)(implicit F: Sync[F]): Resource[F, Unit] =
Resource.make(F.delay(Kamon.storeContext(context)))(scope => F.delay(scope.close())).as(())
}
package com.mycompany
import akka.actor.ActorSystem
import cats.effect.{IO, IOApp, Resource}
import com.samsung.iotcloud.mqtt.metrics.KamonMetrics
import com.typesafe.scalalogging.Logger
import kamon.Kamon
import java.util.UUID
import java.util.concurrent.CompletableFuture
import scala.concurrent.{ExecutionContext, Future}
// Example to demonstrate Kamon losing context when switching across CompletableFuture/Scala Futures/Cats Effect IO
object Test1 extends IOApp.Simple {
private val logger = Logger(this.getClass)
override def run: IO[Unit] = {
Kamon.init()
val test = for {
system <- Resource.make(IO.delay(ActorSystem("my-test-classic-system")))(
system => IO.fromFuture(IO.delay(system.terminate())).as(())
)
_ <- KamonUtils.withSpan[IO](
Kamon
.spanBuilder("my-span-1")
.tag("userName", "user1")
.tag("clientId", "client1")
.tag("loggingId", UUID.randomUUID().toString)
.ignoreParentFromContext()
)
_ <- Resource.eval(for {
// Context is not lost
_ <- IO.delay(logger.debug(s"current span ${Kamon.currentSpan()}, context: ${Kamon.currentContext()}"))
// Context is not lost
_ <- IO.delay(logger.debug(s"current span2 ${Kamon.currentSpan()}, context: ${Kamon.currentContext()}"))
// Context is not lost when using the default global Execution Context
// implicit0(ec: ExecutionContext) = scala.concurrent.ExecutionContext.Implicits.global
// Context is lost across scala future switching when using akka dispatcher
implicit0(ec: ExecutionContext) = system.dispatcher
// Context gets lost when using akka dispatcher
_ <- IOUtils.fromFuture[IO, Unit](
Future(
logger.debug(
s"In Scala Future. current span3: ${Kamon.currentSpan()}, context: ${Kamon.currentContext()}"
)
)
)
// Context gets lost when using akka dispatcher
_ <- IOUtils.fromFuture[IO, Unit](
Future(
logger.debug(s"In future. current span4 ${Kamon.currentSpan()}, context: ${Kamon.currentContext()}")
)
)
_ <- IO.delay(logger.debug(s"Current span5: ${Kamon.currentSpan()}, context: ${Kamon.currentContext()}"))
} yield ())
} yield ()
test.use(_ => IO.unit).guarantee(IO.delay(Kamon.stop()))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment