Skip to content

Instantly share code, notes, and snippets.

Last active June 25, 2018 18:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zvuki/4dcc084213d3e5af06405959f39a9cc8 to your computer and use it in GitHub Desktop.
Save zvuki/4dcc084213d3e5af06405959f39a9cc8 to your computer and use it in GitHub Desktop.
trait Traced {
def tracer: Tracer
def traced[T](name: String)(spanFn: Span => Future[T])(implicit ec: ExecutionContext): Future[T] = {
val scope = tracer.buildSpan(name).startActive(true)
val span = scope.span()
try {
spanFn(span).andThen {
case Success(_) =>
case Failure(ex) =>
span.setTag("error", true)
span.setTag("exception.message", ex.getMessage)
span.setTag("exception", ex.getClass.toString)
} finally {
* java's activeSpan is null when there is no active span;
* returns Scala Option which is easier to deal with
def activeSpan: Option[Span] =
traced("AkkaHttpRequest") { span =>
span.setTag("http.uri", httpReqTraced.uri.toString)
span.setTag("http.method", httpReqTraced.method.value)
.andThen {
case Success(rsp) =>
rsp.status match {
case ServerError(_) => span.setTag("error", true)
case _ =>
span.setTag("http.status_code", rsp.status.intValue())
class OpenTracingThreadPropagation(tracer: Tracer) extends ThreadPropagation[Span] {
val scopeManager = tracer.scopeManager()
// captures the span at the call site
override def captureThreadState(): Span = tracer.activeSpan()
// copies the span into the new thread being spawned
override def wrapWithState(runnable: Runnable, state: Span): Runnable = new Runnable {
override def run(): Unit = {
val scope =
if(state != null) scopeManager.activate(state, true)
else null
try {
} finally {
if(scope != null) {
* This execution context is a wrapper around an existing execution
* context implementation that propagates state across thread boundaries.
* This functionality is commonly used to carry diagnostic information
* (such as tracing context or logback MDCs) through an asynchronous
* flow of code that may span many different threads.
* For example, a request might come into a server on one thread
* with a tracing ID stored in a [[ThreadLocal]]. Given the right [[ThreadPropagation]]
* instance, this execution context will transparently carry that
* thread local context to every new callback attached using it.
* @param wrapped
* @param propagations
class PropagatingExecutionContext(wrapped:ExecutionContext,
extends ExecutionContextExecutor { self =>
override def execute(runnable: Runnable): Unit =
override def reportFailure(cause: Throwable): Unit =
protected def captureCallSiteState():Seq[CapturedThreadState[_]] = => p.capture())
override def prepare(): ExecutionContext = {
new ExecutionContext {
private val callSiteState = captureCallSiteState()
override def reportFailure(cause: Throwable): Unit =
override def execute(runnable: Runnable): Unit = {
val r = callSiteState.foldRight(runnable) { (s,acc) =>
override def prepare(): ExecutionContext = self.prepare()
private[concurrent] case class CapturedThreadState[S](state:S,propagation:ThreadPropagation[S]) {
def wrap(r:Runnable):Runnable = propagation.wrapWithState(r, state)
trait ThreadPropagation[S] {
private[concurrent] def capture():CapturedThreadState[_] =
CapturedThreadState(captureThreadState(), this)
def captureThreadState(): S
def wrapWithState(runnable:Runnable, state:S): Runnable
lazy val AsyncExecutor: ExecutorService = {
lazy val propagations:Seq[ThreadPropagation[_]] = Seq(
new GrpcContextThreadPropagation(),
new MDCThreadPropagation,
new OpenTracingThreadPropagation(Application.tracer),
new NewRelicTransactionPropagation
lazy val AsyncThreadPool: ExecutionContext =
new PropagatingExecutionContext(
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment