Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
don't batch tasks across executors, and consolidate some closures and Runnables into one Task object
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 957f9cf..0dcfa87 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -44,13 +44,11 @@ private[concurrent] object Future {
}
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) toBoxed(c) else c
-
- def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
- val promise = new Promise.DefaultPromise[T]()
- //TODO: use `dispatchFuture`?
- executor.execute(new Runnable {
- def run = promise complete {
+ private[impl] class BodyTask[T](override val executor: ExecutionContext, body: => T, val promise: Promise[T])
+ extends Task {
+ protected override def task() = {
+ promise complete {
try Right(body) catch {
case NonFatal(e) =>
// Commenting out reporting for now, since it produces too much output in the tests
@@ -58,8 +56,15 @@ private[concurrent] object Future {
scala.concurrent.resolver(e)
}
}
- })
-
+ }
+ }
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ val promise = new Promise.DefaultPromise[T]()
+
+ val task = new BodyTask(executor, body, promise)
+ task.dispatch()
+
promise.future
}
@@ -68,38 +73,77 @@ private[concurrent] object Future {
// so that it can be stolen from
// OR: a push to the local task queue should be so cheap that this is
// not even needed, but stealing is still possible
- private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private[impl] case class TaskStack(stack: Stack[Task], executor: ExecutionContext)
+
+ private val _taskStack = new ThreadLocal[TaskStack]()
+
+ private[impl] trait Task extends Runnable {
+ def executor: ExecutionContext
+
+ // run the original callback (no dispatch)
+ protected def task(): Unit
+
+ // we implement Runnable to avoid creating
+ // an extra object. run() runs ourselves with
+ // a TaskStack pushed, and then runs any
+ // other tasks that show up in the stack.
+ final override def run() = {
+ try {
+ val taskStack = TaskStack(Stack[Task](this), executor)
+ _taskStack set taskStack
+ while (taskStack.stack.nonEmpty) {
+ val next = taskStack.stack.pop()
+ require(next.executor eq executor)
+ try next.task() catch { case NonFatal(e) => executor reportFailure e }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+
+ // send the task to the running executor.execute() via
+ // _taskStack, or start a new executor.execute()
+ def dispatch(force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && (executor eq stack.executor) && !force => stack.stack push this
+ case _ => executor.execute(this)
+ }
+ }
+
+ private[impl] class ReleaseTask(override val executor: ExecutionContext, val elems: List[Task])
+ extends Task {
+ protected override def task() = {
+ _taskStack.get.stack.elems = elems
+ }
+ }
private[impl] def releaseStack(executor: ExecutionContext): Unit =
_taskStack.get match {
- case stack if (stack ne null) && stack.nonEmpty =>
- val tasks = stack.elems
- stack.clear()
+ case stack if (stack ne null) && stack.stack.nonEmpty =>
+ val tasks = stack.stack.elems
+ stack.stack.clear()
_taskStack.remove()
- dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
+ val release = new ReleaseTask(executor, tasks)
+ release.dispatch(force=true)
case null =>
// do nothing - there is no local batching stack anymore
case _ =>
_taskStack.remove()
}
- private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && !force => stack push task // FIXME we can't mix tasks aimed for different ExecutionContexts see: https://github.com/akka/akka/blob/v2.0.1/akka-actor/src/main/scala/akka/dispatch/Future.scala#L373
- case _ => executor.execute(new Runnable {
- def run() {
- try {
- val taskStack = Stack[() => Unit](task)
- _taskStack set taskStack
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try next() catch { case NonFatal(e) => executor reportFailure e }
- }
- } finally {
- _taskStack.remove()
- }
- }
- })
+ private[impl] class OnCompleteTask[T](override val executor: ExecutionContext, val onComplete: (Either[Throwable, T]) => Any)
+ extends Task {
+ private var value: Either[Throwable, T] = null
+
+ protected override def task() = {
+ require(value ne null) // dispatch(value) must be called before dispatch()
+ onComplete(value)
}
-
+
+ def dispatch(value: Either[Throwable, T]): Unit = {
+ this.value = value
+ dispatch()
+ }
+ }
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 0beb1f5..426f60f 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -90,10 +90,10 @@ object Promise {
val resolved = resolveEither(value)
(try {
@tailrec
- def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = {
+ def tryComplete(v: Either[Throwable, T]): List[Future.OnCompleteTask[T]] = {
getState match {
case raw: List[_] =>
- val cur = raw.asInstanceOf[List[Either[Throwable, T] => Unit]]
+ val cur = raw.asInstanceOf[List[Future.OnCompleteTask[T]]]
if (updateState(cur, v)) cur else tryComplete(v)
case _ => null
}
@@ -104,36 +104,21 @@ object Promise {
}) match {
case null => false
case cs if cs.isEmpty => true
- // this assumes that bindDispatch() was called to create f,
- // so it will go via dispatchFuture and notifyCompleted
- case cs => cs.foreach(f => f(resolved)); true
+ case cs => cs.foreach(c => c.dispatch(resolved)); true
}
}
- private def bindDispatch(func: Either[Throwable, T] => Any)(implicit executor: ExecutionContext): Either[Throwable, T] => Unit = {
- either: Either[Throwable, T] =>
- Future.dispatchFuture(executor, () => notifyCompleted(func, either))
- }
-
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): this.type = {
- val bound = bindDispatch(func)
+ val bound = new Future.OnCompleteTask[T](executor, func)
@tailrec //Tries to add the callback, if already completed, it dispatches the callback to be executed
def dispatchOrAddCallback(): Unit =
getState match {
- case r: Either[_, _] => bound(r.asInstanceOf[Either[Throwable, T]])
+ case r: Either[_, _] => bound.dispatch(r.asInstanceOf[Either[Throwable, T]])
case listeners: List[_] => if (updateState(listeners, bound :: listeners)) () else dispatchOrAddCallback()
}
dispatchOrAddCallback()
this
}
-
- private final def notifyCompleted(func: Either[Throwable, T] => Any, result: Either[Throwable, T])(implicit executor: ExecutionContext) {
- try {
- func(result)
- } catch {
- case NonFatal(e) => executor reportFailure e
- }
- }
}
/** An already completed Future is given its result at creation.
@@ -150,7 +135,7 @@ object Promise {
def onComplete[U](func: Either[Throwable, T] => U)(implicit executor: ExecutionContext): this.type = {
val completedAs = value.get
- Future.dispatchFuture(executor, () => func(completedAs))
+ (new Future.OnCompleteTask(executor, func)).dispatch(completedAs)
this
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment