Skip to content

Instantly share code, notes, and snippets.

@rossabaker
Last active February 22, 2022 17:24
Show Gist options
  • Save rossabaker/8872792b06bd84e5be8fae3c9caf8731 to your computer and use it in GitHub Desktop.
Save rossabaker/8872792b06bd84e5be8fae3c9caf8731 to your computer and use it in GitHub Desktop.
import $ivy.`org.typelevel::munit-cats-effect-3:1.0.7`
import $ivy.`org.tpolecat::natchez-mock:0.1.6`
import $ivy.`co.fs2::fs2-core:3.2.5`
import cats.data.Kleisli
import cats.effect.Deferred
import cats.effect.IO
import cats.effect.IOLocal
import cats.effect.MonadCancelThrow
import cats.effect.Resource
import cats.syntax.all._
import fs2.Pull
import fs2.Stream
import natchez._
import natchez.mock._
import io.opentracing.mock.{ MockSpan => OTMockSpan }
import io.opentracing.mock.MockTracer
import java.net.URI
import munit.CatsEffectSuite
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
trait TraceResource[F[_]] extends Trace[F] {
def resource[A](name: String)(r: Resource[F, A]): Resource[F, A]
}
object TraceResource {
def apply[F[_]](implicit ev: TraceResource[F]): ev.type = ev
def ioTrace(rootSpan: Span[IO]): IO[TraceResource[IO]] =
IOLocal(rootSpan).map { local =>
new TraceResource[IO] {
def allocated(name: String): IO[(Unit, IO[Unit])] =
(for {
parent <- Resource.eval(local.get)
child <- parent.span(name)
_ <- Resource.make(local.set(child))(_ => local.set(parent))
} yield ()).allocated
def resource[A](name: String)(r: Resource[IO, A]): Resource[IO, A] =
for {
parent <- Resource.eval(local.get)
child <- parent.span(name)
_ <- Resource.make(local.set(child))(_ => local.set(parent))
a <- r
} yield a
def span[A](name: String)(k: IO[A]): IO[A] =
resource(name)(Resource.unit).use(_ => k)
// We could be smarter about inheriting these...
def put(fields: (String, TraceValue)*): IO[Unit] =
local.get.flatMap(_.put(fields: _*))
def kernel: IO[Kernel] =
local.get.flatMap(_.kernel)
def traceId: IO[Option[String]] =
local.get.flatMap(_.traceId)
def traceUri: IO[Option[URI]] =
local.get.flatMap(_.traceUri)
}
}
import cats.effect.Sync
implicit def kleisliInstance[F[_]](implicit ev: Sync[F]): KleisliTraceResource[F] =
new KleisliTraceResource[F]
class KleisliTraceResource[F[_]](implicit ev: Sync[F]) extends TraceResource[Kleisli[F, Span[F], *]] {
def kernel: Kleisli[F, Span[F], Kernel] =
Kleisli(_.kernel)
def put(fields: (String, TraceValue)*): Kleisli[F, Span[F], Unit] =
Kleisli(_.put(fields: _*))
def resource[A](name: String)(r: Resource[Kleisli[F, Span[F], *], A]): Resource[Kleisli[F, Span[F], *], A] =
Resource.suspend(
Kleisli((span: Span[F]) =>
span.span(name).flatMap(child => r.mapK(Kleisli.applyK(child)))
).mapF(ra => ra.mapK(Kleisli.liftK[F, Span[F]]).pure[F])
)
def span[A](name: String)(k: Kleisli[F, Span[F], A]): Kleisli[F,Span[F],A] =
Kleisli(_.span(name).use(k.run))
def lens[E](f: E => Span[F], g: (E, Span[F]) => E): Trace[Kleisli[F, E, *]] =
new Trace[Kleisli[F, E, *]] {
def kernel: Kleisli[F,E,Kernel] =
Kleisli(e => f(e).kernel)
def put(fields: (String, TraceValue)*): Kleisli[F,E,Unit] =
Kleisli(e => f(e).put(fields: _*))
def span[A](name: String)(k: Kleisli[F, E, A]): Kleisli[F, E, A] =
Kleisli(e => f(e).span(name).use(s => k.run(g(e, s))))
def traceId: Kleisli[F,E,Option[String]] =
Kleisli(e => f(e).traceId)
def traceUri: Kleisli[F,E,Option[URI]] =
Kleisli(e => f(e).traceUri)
}
def traceId: Kleisli[F,Span[F],Option[String]] =
Kleisli(_.traceId)
def traceUri: Kleisli[F,Span[F],Option[URI]] =
Kleisli(_.traceUri)
}
}
class ClientTracingSuite extends CatsEffectSuite {
def lookupSpan(tracer: MockTracer, name: String): IO[OTMockSpan] =
IO.delay(tracer.finishedSpans.asScala.find(_.operationName === name)).flatMap {
case Some(span) => IO.pure(span)
case None => IO.raiseError(new NoSuchElementException(s"no span named ${name}"))
}
case class Request(editor: String)
case class Response(opinion: String)
type Client[F[_]] = Request => Resource[F, Response]
def client[F[_]]: Client[F] = { req =>
req match {
case Request("emacs") => Resource.pure(Response("good"))
case _ => Resource.pure(Response("bad"))
}
}
def tracedClient[F[_]: MonadCancelThrow: TraceResource](client: Client[F]): Client[F] = { req =>
TraceResource[F].resource("client")(
Resource(
Trace[F].span("acquire")(client(req).allocated).map { case (resp, release) =>
(resp, Trace[F].span("release")(release))
}
)
)
}
test("client tracing") {
val tracer = new MockTracer
for {
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start()))
_ <- TraceResource.ioTrace(root).flatMap { implicit trace =>
val req = Request("emacs")
tracedClient[IO](client).apply(req).use { resp =>
// ambient span here is client. Create child spans to our heart's delight
trace.span("use") {
trace.put(req.editor -> resp.opinion)
}
}
}
_ <- IO(root.span.finish())
clientSpan <- lookupSpan(tracer, "client")
acquireSpan <- lookupSpan(tracer, "acquire")
_ = assertEquals(acquireSpan.parentId, clientSpan.context.spanId)
releaseSpan <- lookupSpan(tracer, "release")
_ = assertEquals(releaseSpan.parentId, clientSpan.context.spanId)
useSpan <- lookupSpan(tracer, "use")
_ = assertEquals(useSpan.parentId, clientSpan.context.spanId)
_ = assertEquals(useSpan.tags.asScala.get("emacs"), Some("good"))
} yield ()
}
test("client tracing -- kleisli") {
val tracer = new MockTracer
val k = for {
root <- Kleisli.ask[IO, Span[IO]]
trace = Trace[Kleisli[IO, Span[IO], *]]
req = Request("emacs")
_ <- tracedClient[Kleisli[IO, Span[IO], *]](client).apply(req).use { resp =>
// ambient span here is client. Create child spans to our heart's delight
trace.span("use") {
trace.put(req.editor -> resp.opinion)
}
}
} yield ()
for {
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start()))
_ <- k.run(root)
_ <- IO(root.span.finish())
clientSpan <- lookupSpan(tracer, "client")
acquireSpan <- lookupSpan(tracer, "acquire")
_ = assertEquals(acquireSpan.parentId, clientSpan.context.spanId)
releaseSpan <- lookupSpan(tracer, "release")
_ = assertEquals(releaseSpan.parentId, clientSpan.context.spanId)
useSpan <- lookupSpan(tracer, "use")
_ = assert(clientSpan.startMicros <= acquireSpan.startMicros)
_ = assert(clientSpan.finishMicros >= releaseSpan.finishMicros)
_ = assertEquals(useSpan.parentId, root.span.context.spanId)
_ = assertEquals(useSpan.tags.asScala.get("emacs"), Some("good"))
} yield ()
}
test("asynchronous") {
val tracer = new MockTracer
for {
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start()))
done <- Deferred[IO, Unit]
_ <- Trace.ioTrace(root).flatMap { trace =>
trace.span("a") {
(IO.sleep(100.millis) *>
trace.span("b")(done.complete(()))
).start
} *>
trace.put("ambient" -> "root")
}
_ <- done.get
_ <- IO(root.span.finish())
a <- lookupSpan(tracer, "a")
b <- lookupSpan(tracer, "b")
_ = assertEquals(b.parentId, a.context.spanId)
_ = assert(b.startMicros > a.finishMicros)
_ = assertEquals(root.span.tags.asScala.get("ambient"), Some("root"))
} yield ()
}
test("weird and misnested") {
val tracer = new MockTracer
for {
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start()))
_ <- TraceResource.ioTrace(root).flatMap { trace =>
for {
a <- trace.resource("a")(Resource.unit).allocated
b <- trace.resource("b")(Resource.unit).allocated
_ <- a._2
_ <- b._2
// a is finished, and is now the ambient span. We've lost root.
c <- trace.resource("c")(Resource.unit).use_
} yield ()
}
a <- lookupSpan(tracer, "a")
c <- lookupSpan(tracer, "c")
_ <- IO(root.span.finish())
_ = assertEquals(c.parentId, a.context.spanId)
} yield ()
}
test("stream") {
val tracer = new MockTracer
for {
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start()))
_ <- TraceResource.ioTrace(root).flatMap { trace =>
(for {
_ <- Stream.resource(trace.resource("stream")(Resource.unit))
s <- Stream(1, 2, 3).foldMonoid
_ <- Stream.eval(trace.put("sum" -> s))
} yield ()).compile.drain
}
stream <- lookupSpan(tracer, "stream")
_ = assertEquals(stream.parentId, root.span.context.spanId)
_ = assertEquals(stream.tags.asScala.get("sum"), Some(6))
} yield ()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment