Skip to content

Instantly share code, notes, and snippets.

@havocp
Created July 12, 2012 13:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save havocp/3097999 to your computer and use it in GitHub Desktop.
Save havocp/3097999 to your computer and use it in GitHub Desktop.
BatchingExecutionContext
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.concurrent.impl
import scala.concurrent.util.Duration
import scala.concurrent.{Awaitable, ExecutionContext, CanAwait}
import java.util.concurrent.Executor
/**
* An ExecutionContext which attempts to keep related Runnable batched on the
* same thread, which may give better performance by 1) avoiding dispatch
* through the ExecutionContext's queue and 2) creating a simple
* "CPU affinity" for a related chain of tasks.
*/
private class BatchingExecutionContext(val delegate: ExecutionContext) extends ExecutionContext with Executor {
// invariant: if "_tasksLocal.get ne null" then we are inside
// BatchingRunnable.run; if it is null, we are outside
private val _tasksLocal = new ThreadLocal[List[Runnable]]()
// only valid to call if _tasksLocal.get ne null
private def push(runnable: Runnable): Unit =
_tasksLocal.set(runnable :: _tasksLocal.get)
// only valid to call if _tasksLocal.get ne null
private def nonEmpty(): Boolean =
_tasksLocal.get.nonEmpty
// only valid to call if _tasksLocal.get ne null
private def pop(): Runnable = {
val tasks = _tasksLocal.get
_tasksLocal.set(tasks.tail)
tasks.head
}
private class BatchingBlockContext(previous: BlockContext) extends BlockContext {
override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
// if we know there will be blocking, we don't want to
// keep tasks queued up because it could deadlock.
_tasksLocal.get match {
case null =>
// not inside a BatchingRunnable
case Nil =>
// inside a BatchingRunnable, but nothing is queued up
case list => {
// inside a BatchingRunnable and there's a queue;
// make a new BatchingRunnable and send it to
// another thread
_tasksLocal set Nil
delegate.execute(new BatchingRunnable(list))
}
}
// now delegate the blocking to the previous BC
previous.internalBlockingCall(awaitable, atMost)
}
}
// ONLY BatchingRunnable should be sent directly
// to delegate.execute()
private class BatchingRunnable(val initial: List[Runnable]) extends Runnable {
// this method runs in the delegate ExecutionContext's thread
override def run(): Unit = {
require(_tasksLocal.get eq null)
val bc = new BatchingBlockContext(BlockContext.current)
BlockContext.withBlockContext(bc) {
try {
_tasksLocal set initial
while (nonEmpty) {
val next = pop()
try next.run() catch { case NonFatal(e) => reportFailure(e) }
}
} finally {
_tasksLocal.remove()
require(_tasksLocal.get eq null)
}
}
}
}
override def execute(runnable: Runnable): Unit = {
_tasksLocal.get match {
case null =>
// outside BatchingRunnable.run: start a new batch
delegate.execute(new BatchingRunnable(runnable :: Nil))
case _ =>
// inside BatchingRunnable.run: add to existing batch, existing BatchingRunnable will run it
push(runnable)
}
}
def reportFailure(t: Throwable): Unit = delegate.reportFailure(t)
}
object BatchingExecutionContext {
def apply(delegate: ExecutionContext): ExecutionContext with Executor = new BatchingExecutionContext(delegate)
}
@havocp
Copy link
Author

havocp commented Jul 18, 2012

public API could be:

object ExecutionContext {
     /** Decorate an ExecutionContext with a wrapper context
      * which groups multiple nested `Runnable.run()` calls
      * into a single Runnable passed to the original
      * ExecutionContext. This can be a useful optimization
      * because it bypasses the original context's task
      * queue and keeps related (nested) code on a single
      * thread which may improve CPU affinity. However,
      * if tasks passed to the ExecutionContext are blocking
      * or expensive, this optimization can prevent work-stealing
      * and make performance worse. Also, some ExecutionContext
      * may be fast enough natively that this optimization just
      * adds overhead.
      * The default ExecutionContext.global is already batching
      * or fast enough not to benefit from it; while
      * `fromExecutor` and `fromExecutorService` do NOT add
      * this optimization since they don't know whether the underlying
      * executor will benefit from it.
      * A batching executor can create deadlocks if code does
      * not use `scala.concurrent.blocking` when it should,
      * because tasks created within other tasks will block
      * on the outer task completing.
      */
     def batching(context: ExecutionContext): ExecutionContext with Executor = impl.BatchingExecutionContext(context)
   }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment