Skip to content

Instantly share code, notes, and snippets.

@codeniko
Forked from jriecken/AsyncController.diff
Created December 1, 2016 01:32
Show Gist options
  • Save codeniko/16dc778fd273d13a1be7910e7f3c4d2b to your computer and use it in GitHub Desktop.
Save codeniko/16dc778fd273d13a1be7910e7f3c4d2b to your computer and use it in GitHub Desktop.
Contextual Logging
import play.api.Logger
import org.slf4j.MDC
+import scala.concurrent.{ExecutionContext, Future}
import java.util.UUID
+import java.util.concurrent.Executors
-object SyncController extends Controller {
- def syncEndpoint(profileId: Long) = Action(parse.json) { req =>
+object AsyncController extends Controller {
+ implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
+ // For this example, ensure the threads in the pool are already created
+ // to avoid Logback's InheritableThreadLocal poisoning the new threads
+ // with bad MDC data
+ for (i <- 1 to 2) Future {}
+
+ def asyncEndpoint(profileId: Long) = Action.async(parse.json) { req =>
try {
MDC.put("reqId", UUID.randomUUID().toString)
MDC.put("profileId", profileId.toString)
+ // This is running on the same thread that set the MDC values
Logger.info("Starting request")
- val postIds = for (message <- req.body.as[List[String]]) yield {
+ val futurePostIds = for (message <- req.body.as[List[String]]) yield {
sendMessage(profileId, message)
}
- Logger.info(s"Finished posting. Result: $postIds")
- Ok
+ Future.sequence(futurePostIds).map { postIds =>
+ // This is running on one of the ExecutionContext threads
+ Logger.info(s"Finished posting. Result: $postIds")
+ Ok
+ }
} finally {
MDC.clear()
}
}
- private def sendMessage(profileId: Long, message: String): String = {
+ private def sendMessage(profileId: Long, message: String): Future[String] = Future {
+ // This is running on one of the ExecutionContext threads
Logger.info(s"Sending message to profile $profileId: $message")
UUID.randomUUID().toString
}
- implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
+ implicit val ec = MDCPropagatingExecutionContextWrapper(
+ ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
+ )
{reqId=ef275f56-c8b7-41c7-a89f-1e26577af920, profileId=54321} - Starting request
{reqId=ef275f56-c8b7-41c7-a89f-1e26577af920, profileId=54321} - Sending message to profile 54321: message 2
{reqId=ef275f56-c8b7-41c7-a89f-1e26577af920, profileId=54321} - Sending message to profile 54321: message 1
{reqId=ef275f56-c8b7-41c7-a89f-1e26577af920, profileId=54321} - Sending message to profile 54321: message 4
{reqId=ef275f56-c8b7-41c7-a89f-1e26577af920, profileId=54321} - Sending message to profile 54321: message 3
{reqId=ef275f56-c8b7-41c7-a89f-1e26577af920, profileId=54321} - Finished posting. Result: List(8847c89d-a26a-46b3-a13e-7eed1170205d, 314acabf-ab62-4dd1-9de2-46495f4e8c86, 44c9e7d7-7512-4380-8a30-a75555b789b0, 627133dc-00ea-4427-8cd5-389ccd834eed)
{reqId=fca8a21f-8303-480f-9136-60e602ea621f, profileId=12345} - Starting request
{} - Sending message to profile 12345: message 1
{} - Sending message to profile 12345: message 3
{} - Sending message to profile 12345: message 2
{} - Sending message to profile 12345: message 4
{} - Finished posting. Result: List(d2428353-4889-49a3-a886-4752493bd701, 0eea698e-a1c0-4f64-8d0a-629cbe359483, 22b86bc1-d36d-402c-a00e-0e7bdd48f9a9, 2dcefb11-be12-4540-aca3-0179c53228ad)
final class ContextLocal[T] private(
// Unique name for the ContextLocal
private val name: String,
// Optionally have this ContextLocal proxy something else
private val proxy: Option[ContextLocal.Proxy[T]] = None
) {
// Register with the global ThreadLocal state
ContextLocal.register(this)
/**
* Set the current value
*/
def set(v: T): Unit = ...
/**
* Get the current value
*/
def get: Option[T] = ...
/**
* Clear the current value
*/
def clear(): Unit = ...
}
object ContextLocal {
trait Proxy[T] {
/**
* Called when the ContextLocal value is set
* (usually as a result of setContext)
*/
def set(v: T): Unit
/**
* Called when the ContextLocal value is retrieved
* (usually as a result of getContext)
*/
def get: Option[T]
/*
* Called when the ContextLocal value is cleared
* (usually as a result of clearContext)
*/
def clear(): Unit
}
type State = ... // The actual type of this is an implementation detail
/*
* Registration + internal state
*/
private[this] val state = new ThreadLocal[State]
private[this] val registry = new AtomicReference(Map.empty[String, ContextLocal[_]])
// This asserts that two ContextLocals do not have the same name, and safely updates the registry
private def register(contextLocal: ContextLocal[_]): Unit = ...
/*
* Factory Methods
*/
def create[T](name: String = UUID.randomUUID().toString): ContextLocal[T] = new ContextLocal(name)
def createProxy[T](name: String = UUID.randomUUID().toString, proxy: Proxy[T]): ContextLocal[T] = new ContextLocal(name, Some(proxy))
/*
* Propagation Methods
*/
/**
* Get the current state of all ContextLocals
*/
def getContext: State = ...
/**
* Set the current state of all ContextLocals to one retrieved earlier by getContext
*/
def setContext(state: State): Unit = ...
/**
* Clear the value of all the current ContextLocals
*/
def clearContext(): Unit = ...
/**
* Execute a block of code using the specified state of ContextLocals
* and restore the current state when complete (success or failure)
*/
def withContext[T](state: State)(f: => T): T = ...
/**
* Execute a block of code with a clear state of ContextLocals
* and restore the current state when complete (success or failure)
*/
def withClearContext[T](f: => T): T = ...
}
log.error(s"Failed to send message. User: $userId, SocialNetwork: $socialNetworkType, Profile: $profileId, ErrorCode: $errorCode")
package akka.actor
import akka.util.Timeout
import org.slf4j.MDC
import scala.concurrent.Future
trait MDCContextAware extends Actor {
import MDCContextAware._
// This is why this needs to be in package akka.actor
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
val orig = MDC.getCopyOfContextMap
try {
msg match {
case MdcMsg(mdc, origMsg) =>
if (mdc != null)
MDC.setContextMap(mdc)
else
MDC.clear()
super.aroundReceive(receive, origMsg)
case _ =>
super.aroundReceive(receive, msg)
}
} finally {
if (orig != null)
MDC.setContextMap(orig)
else
MDC.clear()
}
}
}
object MDCContextAware {
private case class MdcMsg(mdc: java.util.Map[_,_], msg: Any)
object Implicits {
/**
* Add two new methods that allow MDC info to be passed to MDCContextAware actors.
*
* Do NOT use these methods to send to actors that are not MDCContextAware.
*/
implicit class ContextLocalAwareActorRef(val ref: ActorRef) extends AnyVal {
import akka.pattern.ask
/**
* Send a message to an actor that is MDCContextAware - it will propagate
* the current MDC values.
*/
def !>(msg: Any): Unit =
ref ! MdcMsg(MDC.getCopyOfContextMap, msg)
/**
* "Ask" an actor that is MDCContextAware for something - it will propagate
* the current MDC values
*/
def ?>(msg: Any)(implicit timeout: Timeout): Future[Any] =
ref ? MdcMsg(MDC.getCopyOfContextMap, msg)
}
}
}
class SendingActor extends Actor with MDCContextAware {
//...
}
// ...
import MDCContextAware.Implicits._
val awareActor = //...
awareActor !> "message with MDC info passed along"
import org.slf4j.MDC
import scala.concurrent.ExecutionContext
trait MDCPropagatingExecutionContext extends ExecutionContext {
// name the self-type "self" so we can refer to it inside the nested class
self =>
override def prepare(): ExecutionContext = new ExecutionContext {
// Save the call-site MDC state
val context = MDC.getCopyOfContextMap
def execute(r: Runnable): Unit = self.execute(new Runnable {
def run(): Unit = {
// Save the existing execution-site MDC state
val oldContext = MDC.getCopyOfContextMap
try {
// Set the call-site MDC state into the execution-site MDC
if (context != null )
MDC.setContextMap(context)
else
MDC.clear()
r.run()
} finally {
// Restore the existing execution-site MDC state
if (oldContext != null)
MDC.setContextMap(oldContext)
else
MDC.clear()
}
}
})
def reportFailure(t: Throwable): Unit = self.reportFailure(t)
}
}
object MDCPropagatingExecutionContext {
object Implicits {
// Convenience wrapper around the Scala global ExecutionContext so you can just do:
// import MDCPropagatingExecutionContext.Implicits.global
implicit lazy val global = MDCPropagatingExecutionContextWrapper(ExecutionContext.Implicits.global)
}
}
/**
* Wrapper around an existing ExecutionContext that makes it propagate MDC information.
*/
class MDCPropagatingExecutionContextWrapper(wrapped: ExecutionContext)
extends ExecutionContext with MDCPropagatingExecutionContext {
override def execute(r: Runnable): Unit = wrapped.execute(r)
override def reportFailure(t: Throwable): Unit = wrapped.reportFailure(t)
}
object MDCPropagatingExecutionContextWrapper {
def apply(wrapped: ExecutionContext): MDCPropagatingExecutionContextWrapper = {
new MDCPropagatingExecutionContextWrapper(wrapped)
}
}
/** Prepares for the execution of a task. Returns the prepared execution context.
*
* `prepare` should be called at the site where an `ExecutionContext` is received (for
* example, through an implicit method parameter). The returned execution context may
* then be used to execute tasks. The role of `prepare` is to save any context relevant
* to an execution's ''call site'', so that this context may be restored at the
* ''execution site''. (These are often different: for example, execution may be
* suspended through a `Promise`'s future until the `Promise` is completed, which may
* be done in another thread, on another stack.)
*
* Note: a valid implementation of `prepare` is one that simply returns `this`.
*
* @return the prepared execution context
*/
import play.api.mvc._
import play.api.Logger
import org.slf4j.MDC
import java.util.UUID
object SyncController extends Controller {
def syncEndpoint(profileId: Long) = Action(parse.json) { req =>
try {
MDC.put("reqId", UUID.randomUUID().toString)
MDC.put("profileId", profileId.toString)
Logger.info("Starting request")
val postIds = for (message <- req.body.as[List[String]]) yield {
sendMessage(profileId, message)
}
Logger.info(s"Finished posting. Result: $postIds")
Ok
} finally {
MDC.clear()
}
}
private def sendMessage(profileId: Long, message: String): String = {
Logger.info(s"Sending message to profile $profileId: $message")
UUID.randomUUID().toString
}
}
{reqId=bd64ec57-5a99-4453-87b1-9d4dd17401ee, profileId=12345} - Starting request
{reqId=bd64ec57-5a99-4453-87b1-9d4dd17401ee, profileId=12345} - Sending message to profile 12345: message 1
{reqId=bd64ec57-5a99-4453-87b1-9d4dd17401ee, profileId=12345} - Sending message to profile 12345: message 2
{reqId=bd64ec57-5a99-4453-87b1-9d4dd17401ee, profileId=12345} - Sending message to profile 12345: message 3
{reqId=bd64ec57-5a99-4453-87b1-9d4dd17401ee, profileId=12345} - Sending message to profile 12345: message 4
{reqId=bd64ec57-5a99-4453-87b1-9d4dd17401ee, profileId=12345} - Finished posting. Result: List(855db043-8805-4835-854d-185b1e755563, f260f7e8-2c91-4970-b1ff-8284a0d3a8b3, 66716091-c581-42fb-ae40-facc909ef2a7, d253e9bf-fa0f-45b7-8efe-cbba32e74ccf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment