Last active
June 25, 2018 18:09
-
-
Save zvuki/4dcc084213d3e5af06405959f39a9cc8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait Traced { | |
def tracer: Tracer | |
def traced[T](name: String)(spanFn: Span => Future[T])(implicit ec: ExecutionContext): Future[T] = { | |
val scope = tracer.buildSpan(name).startActive(true) | |
val span = scope.span() | |
try { | |
spanFn(span).andThen { | |
case Success(_) => | |
span.finish() | |
case Failure(ex) => | |
span.setTag("error", true) | |
span.setTag("exception.message", ex.getMessage) | |
span.setTag("exception", ex.getClass.toString) | |
span.finish() | |
} | |
} finally { | |
scope.close() | |
} | |
} | |
/** | |
* java's activeSpan is null when there is no active span; | |
* returns Scala Option which is easier to deal with | |
*/ | |
def activeSpan: Option[Span] = | |
Option(tracer.activeSpan()) | |
} | |
traced("AkkaHttpRequest") { span => | |
span.setTag("http.uri", httpReqTraced.uri.toString) | |
span.setTag("http.method", httpReqTraced.method.value) | |
next(httpReqTraced) | |
.andThen { | |
case Success(rsp) => | |
rsp.status match { | |
case ServerError(_) => span.setTag("error", true) | |
case _ => | |
span.setTag("http.status_code", rsp.status.intValue()) | |
} | |
} | |
class OpenTracingThreadPropagation(tracer: Tracer) extends ThreadPropagation[Span] { | |
val scopeManager = tracer.scopeManager() | |
// captures the span at the call site | |
override def captureThreadState(): Span = tracer.activeSpan() | |
// copies the span into the new thread being spawned | |
override def wrapWithState(runnable: Runnable, state: Span): Runnable = new Runnable { | |
override def run(): Unit = { | |
val scope = | |
if(state != null) scopeManager.activate(state, true) | |
else null | |
try { | |
runnable.run() | |
} finally { | |
if(scope != null) { | |
scope.close() | |
} | |
} | |
} | |
} | |
} | |
/** | |
* This execution context is a wrapper around an existing execution | |
* context implementation that propagates state across thread boundaries. | |
* | |
* This functionality is commonly used to carry diagnostic information | |
* (such as tracing context or logback MDCs) through an asynchronous | |
* flow of code that may span many different threads. | |
* | |
* For example, a request might come into a server on one thread | |
* with a tracing ID stored in a [[ThreadLocal]]. Given the right [[ThreadPropagation]] | |
* instance, this execution context will transparently carry that | |
* thread local context to every new callback attached using it. | |
* | |
* @param wrapped | |
* @param propagations | |
*/ | |
class PropagatingExecutionContext(wrapped:ExecutionContext, | |
propagations:Seq[ThreadPropagation[_]]) | |
extends ExecutionContextExecutor { self => | |
override def execute(runnable: Runnable): Unit = | |
wrapped.execute(runnable) | |
override def reportFailure(cause: Throwable): Unit = | |
wrapped.reportFailure(cause) | |
protected def captureCallSiteState():Seq[CapturedThreadState[_]] = | |
propagations.map(p => p.capture()) | |
override def prepare(): ExecutionContext = { | |
new ExecutionContext { | |
private val callSiteState = captureCallSiteState() | |
override def reportFailure(cause: Throwable): Unit = | |
self.reportFailure(cause) | |
override def execute(runnable: Runnable): Unit = { | |
val r = callSiteState.foldRight(runnable) { (s,acc) => | |
s.wrap(acc) | |
} | |
self.execute(r) | |
} | |
override def prepare(): ExecutionContext = self.prepare() | |
} | |
} | |
} | |
private[concurrent] case class CapturedThreadState[S](state:S,propagation:ThreadPropagation[S]) { | |
def wrap(r:Runnable):Runnable = propagation.wrapWithState(r, state) | |
} | |
trait ThreadPropagation[S] { | |
private[concurrent] def capture():CapturedThreadState[_] = | |
CapturedThreadState(captureThreadState(), this) | |
def captureThreadState(): S | |
def wrapWithState(runnable:Runnable, state:S): Runnable | |
} | |
lazy val AsyncExecutor: ExecutorService = { | |
Executors.newWorkStealingPool() | |
} | |
lazy val propagations:Seq[ThreadPropagation[_]] = Seq( | |
new GrpcContextThreadPropagation(), | |
new MDCThreadPropagation, | |
new OpenTracingThreadPropagation(Application.tracer), | |
new NewRelicTransactionPropagation | |
) | |
lazy val AsyncThreadPool: ExecutionContext = | |
new PropagatingExecutionContext( | |
ExecutionContext.fromExecutorService(AsyncExecutor), | |
propagations | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment