Skip to content

Instantly share code, notes, and snippets.

@havocp
Created May 2, 2012 20:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save havocp/2580236 to your computer and use it in GitHub Desktop.
Save havocp/2580236 to your computer and use it in GitHub Desktop.
don't batch tasks across executors, and consolidate some closures and Runnables into one Task object
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 957f9cf..0dcfa87 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -44,13 +44,11 @@ private[concurrent] object Future {
}
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
-
- def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
- val promise = new Promise.DefaultPromise[T]()
- //TODO: use `dispatchFuture`?
- executor.execute(new Runnable {
- def run = promise complete {
+ private[impl] class BodyTask[T](override val executor: ExecutionContext, body: => T, val promise: Promise[T])
+ extends Task {
+ protected override def task() = {
+ promise complete {
try Right(body) catch {
case NonFatal(e) =>
// Commenting out reporting for now, since it produces too much output in the tests
@@ -58,8 +56,15 @@ private[concurrent] object Future {
scala.concurrent.resolver(e)
}
}
- })
-
+ }
+ }
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ val promise = new Promise.DefaultPromise[T]()
+
+ val task = new BodyTask(executor, body, promise)
+ task.dispatch()
+
promise.future
}
@@ -68,38 +73,77 @@ private[concurrent] object Future {
// so that it can be stolen from
// OR: a push to the local task queue should be so cheap that this is
// not even needed, but stealing is still possible
- private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
+
+ private val _taskStack = new ThreadLocal[TaskStack]()
+
+ private[impl] trait Task extends Runnable {
+ def executor: ExecutionContext
+
+ // run the original callback (no dispatch)
+ protected def task(): Unit
+
+ // we implement Runnable to avoid creating
+ // an extra object. run() runs ourselves with
+ // a TaskStack pushed, and then runs any
+ // other tasks that show up in the stack.
+ final override def run() = {
+ try {
+ val taskStack = TaskStack(Stack[Task](this), executor)
+ _taskStack set taskStack
+ while (taskStack.stack.nonEmpty) {
+ val next = taskStack.stack.pop()
+ require(next.executor eq executor)
+ try next.task() catch { case NonFatal(e) => executor reportFailure e }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+
+ // send the task to the running executor.execute() via
+ // _taskStack, or start a new executor.execute()
+ def dispatch(force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
+ case _ => executor.execute(this)
+ }
+ }
+
+ private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
+ extends Task {
+ protected override def task() = {
+ _taskStack.get.stack.elems = elems
+ }
+ }
private[impl] def releaseStack(executor: ExecutionContext): Unit =
_taskStack.get match {
- case stack if (stack ne null) && stack.nonEmpty =>
- val tasks = stack.elems
- stack.clear()
+ case stack if (stack ne null) && stack.stack.nonEmpty =>
+ val tasks = stack.stack.elems
+ stack.stack.clear()
_taskStack.remove()
- dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
+ val release = new ReleaseTask(executor, tasks)
+ release.dispatch(force=true)
case null =>
// do nothing - there is no local batching stack anymore
case _ =>
_taskStack.remove()
}
- private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373
- case _ => executor.execute(new Runnable {
- def run() {
- try {
- val taskStack = Stack[() => Unit](task)
- _taskStack set taskStack
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try next() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
- })
+ private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
+ extends Task {
+ private var value: Either[Throwable, T] = null
+
+ protected override def task() = {
+ require(value ne null) // dispatch(value) must be called before dispatch()
+ onComplete(value)
}
-
+
+ def dispatch(value: Either[Throwable, T]): Unit = {
+ this.value = value
+ dispatch()
+ }
+ }
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 0beb1f5..426f60f 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -90,10 +90,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
+ def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]]
+ val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -104,36 +104,21 @@ object Promise {
}) match {
case null => false
case cs if cs.isEmpty => true
- // this assumes that bindDispatch() was called to create f,
- // so it will go via dispatchFuture and notifyCompleted
- case cs => cs.foreach(f => f(resolved)); true
+ case cs => cs.foreach(c => c.dispatch(resolved)); true
}
}
- private def bindDispatch(func: Either[Throwable, T] => Any)(implicit executor: ExecutionContext): Either[Throwable, T] => Unit = {
- either: Either[Throwable, T] =>
- Future.dispatchFuture(executor, () => notifyCompleted(func, either))
- }
-
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): this.type = {
- val bound = bindDispatch(func)
+ val bound = new Future.OnCompleteTask[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]])
+ case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
this
}
-
- private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) {
- try {
- func(result)
- } catch {
- case NonFatal(e) => executor reportFailure e
- }
- }
}
/** An already completed Future is given its result at creation.
@@ -150,7 +135,7 @@ object Promise {
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): this.type = {
val completedAs = value.get
- Future.dispatchFuture(executor, () => func(completedAs))
+ (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
this
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment