Skip to content

Instantly share code, notes, and snippets.

@zvuki
Last active June 25, 2018 18:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zvuki/4dcc084213d3e5af06405959f39a9cc8 to your computer and use it in GitHub Desktop.
Save zvuki/4dcc084213d3e5af06405959f39a9cc8 to your computer and use it in GitHub Desktop.
trait Traced {
def tracer: Tracer
def traced[T](name: String)(spanFn: Span => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val scope = tracer.buildSpan(name).startActive(true)
val span = scope.span()
try {
spanFn(span).andThen {
case Success(_) =>
span.finish()
case Failure(ex) =>
span.setTag("error", true)
span.setTag("exception.message", ex.getMessage)
span.setTag("exception", ex.getClass.toString)
span.finish()
}
} finally {
scope.close()
}
}
/**
* java's activeSpan is null when there is no active span;
* returns Scala Option which is easier to deal with
*/
def activeSpan: Option[Span] =
Option(tracer.activeSpan())
}
traced("AkkaHttpRequest") { span =>
span.setTag("http.uri", httpReqTraced.uri.toString)
span.setTag("http.method", httpReqTraced.method.value)
next(httpReqTraced)
.andThen {
case Success(rsp) =>
rsp.status match {
case ServerError(_) => span.setTag("error", true)
case _ =>
span.setTag("http.status_code", rsp.status.intValue())
}
}
class OpenTracingThreadPropagation(tracer: Tracer) extends ThreadPropagation[Span] {
val scopeManager = tracer.scopeManager()
// captures the span at the call site
override def captureThreadState(): Span = tracer.activeSpan()
// copies the span into the new thread being spawned
override def wrapWithState(runnable: Runnable, state: Span): Runnable = new Runnable {
override def run(): Unit = {
val scope =
if(state != null) scopeManager.activate(state, true)
else null
try {
runnable.run()
} finally {
if(scope != null) {
scope.close()
}
}
}
}
}
/**
* This execution context is a wrapper around an existing execution
* context implementation that propagates state across thread boundaries.
*
* This functionality is commonly used to carry diagnostic information
* (such as tracing context or logback MDCs) through an asynchronous
* flow of code that may span many different threads.
*
* For example, a request might come into a server on one thread
* with a tracing ID stored in a [[ThreadLocal]]. Given the right [[ThreadPropagation]]
* instance, this execution context will transparently carry that
* thread local context to every new callback attached using it.
*
* @param wrapped
* @param propagations
*/
class PropagatingExecutionContext(wrapped:ExecutionContext,
propagations:Seq[ThreadPropagation[_]])
extends ExecutionContextExecutor { self =>
override def execute(runnable: Runnable): Unit =
wrapped.execute(runnable)
override def reportFailure(cause: Throwable): Unit =
wrapped.reportFailure(cause)
protected def captureCallSiteState():Seq[CapturedThreadState[_]] =
propagations.map(p => p.capture())
override def prepare(): ExecutionContext = {
new ExecutionContext {
private val callSiteState = captureCallSiteState()
override def reportFailure(cause: Throwable): Unit =
self.reportFailure(cause)
override def execute(runnable: Runnable): Unit = {
val r = callSiteState.foldRight(runnable) { (s,acc) =>
s.wrap(acc)
}
self.execute(r)
}
override def prepare(): ExecutionContext = self.prepare()
}
}
}
private[concurrent] case class CapturedThreadState[S](state:S,propagation:ThreadPropagation[S]) {
def wrap(r:Runnable):Runnable = propagation.wrapWithState(r, state)
}
trait ThreadPropagation[S] {
private[concurrent] def capture():CapturedThreadState[_] =
CapturedThreadState(captureThreadState(), this)
def captureThreadState(): S
def wrapWithState(runnable:Runnable, state:S): Runnable
}
lazy val AsyncExecutor: ExecutorService = {
Executors.newWorkStealingPool()
}
lazy val propagations:Seq[ThreadPropagation[_]] = Seq(
new GrpcContextThreadPropagation(),
new MDCThreadPropagation,
new OpenTracingThreadPropagation(Application.tracer),
new NewRelicTransactionPropagation
)
lazy val AsyncThreadPool: ExecutionContext =
new PropagatingExecutionContext(
ExecutionContext.fromExecutorService(AsyncExecutor),
propagations
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment