Skip to content

Instantly share code, notes, and snippets.

@dvliman
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvliman/abcf784a69450e47059b to your computer and use it in GitHub Desktop.
Save dvliman/abcf784a69450e47059b to your computer and use it in GitHub Desktop.
scala future
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala.concurrent
import scala.language.higherKinds
import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable }
import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS }
import java.lang.{ Iterable => JIterable }
import java.util.{ LinkedList => JLinkedList }
import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicLong, AtomicBoolean }
import scala.util.control.NonFatal
import scala.Option
import scala.util.{Try, Success, Failure}
import scala.annotation.tailrec
import scala.collection.mutable.Builder
import scala.collection.generic.CanBuildFrom
import scala.reflect.ClassTag
/** The trait that represents futures.
*
* Asynchronous computations that yield futures are created with the `future` call:
*
* {{{
* val s = "Hello"
* val f: Future[String] = future {
* s + " future!"
* }
* f onSuccess {
* case msg => println(msg)
* }
* }}}
*
* @author Philipp Haller, Heather Miller, Aleksandar Prokopec, Viktor Klang
*
* @define multipleCallbacks
* Multiple callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
*
* @define caughtThrowables
* The future may contain a throwable object and this means that the future failed.
* Futures obtained through combinators have the same exception as the future they were obtained from.
* The following throwable objects are not contained in the future:
* - `Error` - errors are not contained within futures
* - `InterruptedException` - not contained within futures
* - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures
*
* Instead, the future is completed with a ExecutionException with one of the exceptions above
* as the cause.
* If a future is failed with a `scala.runtime.NonLocalReturnControl`,
* it is completed with a value from that throwable instead.
*
* @define nonDeterministic
* Note: using this method yields nondeterministic dataflow programs.
*
* @define forComprehensionExamples
* Example:
*
* {{{
* val f = future { 5 }
* val g = future { 3 }
* val h = for {
* x: Int <- f // returns Future(5)
* y: Int <- g // returns Future(5)
* } yield x + y
* }}}
*
* is translated to:
*
* {{{
* f flatMap { (x: Int) => g map { (y: Int) => x + y } }
* }}}
*
* @define callbackInContext
* The provided callback always runs in the provided implicit
*`ExecutionContext`, though there is no guarantee that the
* `execute()` method on the `ExecutionContext` will be called once
* per callback or that `execute()` will be called in the current
* thread. That is, the implementation may run multiple callbacks
* in a batch within a single `execute()` and it may run
* `execute()` either immediately or asynchronously.
*/
trait Future[+T] extends Awaitable[T] {
// The executor within the lexical scope
// of the Future trait. Note that this will
// (modulo bugs) _never_ execute a callback
// other than those below in this same file.
//
// See the documentation on `InternalCallbackExecutor` for more details.
private def internalExecutor = Future.InternalCallbackExecutor
/* Callbacks */
/** When this future is completed successfully (i.e. with a value),
* apply the provided partial function to the value if the partial function
* is defined at that value.
*
* If the future has already been completed with a value,
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
* $callbackInContext
*/
def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Success(v) =>
pf.applyOrElse[T, Any](v, Predef.conforms[T]) // Exploiting the cached function to avoid MatchError
case _ =>
}
/** When this future is completed with a failure (i.e. with a throwable),
* apply the provided callback to the throwable.
*
* $caughtThrowables
*
* If the future has already been completed with a failure,
* this will either be applied immediately or be scheduled asynchronously.
*
* Will not be called in case that the future is completed with a value.
*
* $multipleCallbacks
* $callbackInContext
*/
def onFailure[U](callback: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Failure(t) =>
callback.applyOrElse[Throwable, Any](t, Predef.conforms[Throwable]) // Exploiting the cached function to avoid MatchError
case _ =>
}
/** When this future is completed, either through an exception, or a value,
* apply the provided function.
*
* If the future has already been completed,
* this will either be applied immediately or be scheduled asynchronously.
*
* $multipleCallbacks
* $callbackInContext
*/
def onComplete[U](func: Try[T] => U)(implicit executor: ExecutionContext): Unit
/* Miscellaneous */
/** Returns whether the future has already been completed with
* a value or an exception.
*
* $nonDeterministic
*
* @return `true` if the future is already completed, `false` otherwise
*/
def isCompleted: Boolean
/** The value of this `Future`.
*
* If the future is not completed the returned value will be `None`.
* If the future is completed the value will be `Some(Success(t))`
* if it contains a valid result, or `Some(Failure(error))` if it contains
* an exception.
*/
def value: Option[Try[T]]
/* Projections */
/** Returns a failed projection of this future.
*
* The failed projection is a future holding a value of type `Throwable`.
*
* It is completed with a value which is the throwable of the original future
* in case the original future is failed.
*
* It is failed with a `NoSuchElementException` if the original future is completed successfully.
*
* Blocking on this future returns a value if the original future is completed with an exception
* and throws a corresponding exception if the original future fails.
*/
def failed: Future[Throwable] = {
implicit val ec = internalExecutor
val p = Promise[Throwable]()
onComplete {
case Failure(t) => p success t
case Success(v) => p failure (new NoSuchElementException("Future.failed not completed with a throwable."))
}
p.future
}
/* Monadic operations */
/** Asynchronously processes the value in the future once the value becomes available.
*
* Will not be called if the future fails.
*/
def foreach[U](f: T => U)(implicit executor: ExecutionContext): Unit = onComplete { _ foreach f }
/** Creates a new future by applying the 's' function to the successful result of
* this future, or the 'f' function to the failed result. If there is any non-fatal
* exception thrown when 's' or 'f' is applied, that exception will be propagated
* to the resulting future.
*
* @param s function that transforms a successful result of the receiver into a
* successful result of the returned future
* @param f function that transforms a failure of the receiver into a failure of
* the returned future
* @return a future that will be completed with the transformed value
*/
def transform[S](s: T => S, f: Throwable => Throwable)(implicit executor: ExecutionContext): Future[S] = {
val p = Promise[S]()
// transform on Try has the wrong shape for us here
onComplete {
case Success(r) => p complete Try(s(r))
case Failure(t) => p complete Try(throw f(t)) // will throw fatal errors!
}
p.future
}
/** Creates a new future by applying a function to the successful result of
* this future. If this future is completed with an exception then the new
* future will also contain this exception.
*
* $forComprehensionExamples
*/
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
val p = Promise[S]()
onComplete { v => p complete (v map f) }
p.future
}
/** Creates a new future by applying a function to the successful result of
* this future, and returns the result of the function as the new future.
* If this future is completed with an exception then the new future will
* also contain this exception.
*
* $forComprehensionExamples
*/
def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
import impl.Promise.DefaultPromise
val p = new DefaultPromise[S]()
onComplete {
case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
case Success(v) => try f(v) match {
// If possible, link DefaultPromises to avoid space leaks
case dp: DefaultPromise[_] => dp.asInstanceOf[DefaultPromise[S]].linkRootOf(p)
case fut => fut.onComplete(p.complete)(internalExecutor)
} catch { case NonFatal(t) => p failure t }
}
p.future
}
/** Creates a new future by filtering the value of the current future with a predicate.
*
* If the current future contains a value which satisfies the predicate, the new future will also hold that value.
* Otherwise, the resulting future will fail with a `NoSuchElementException`.
*
* If the current future fails, then the resulting future also fails.
*
* Example:
* {{{
* val f = future { 5 }
* val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
* Await.result(g, Duration.Zero) // evaluates to 5
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
*/
def filter(pred: T => Boolean)(implicit executor: ExecutionContext): Future[T] =
map {
r => if (pred(r)) r else throw new NoSuchElementException("Future.filter predicate is not satisfied")
}
/** Used by for-comprehensions.
*/
final def withFilter(p: T => Boolean)(implicit executor: ExecutionContext): Future[T] = filter(p)(executor)
/** Creates a new future by mapping the value of the current future, if the given partial function is defined at that value.
*
* If the current future contains a value for which the partial function is defined, the new future will also hold that value.
* Otherwise, the resulting future will fail with a `NoSuchElementException`.
*
* If the current future fails, then the resulting future also fails.
*
* Example:
* {{{
* val f = future { -5 }
* val g = f collect {
* case x if x < 0 => -x
* }
* val h = f collect {
* case x if x > 0 => x * 2
* }
* Await.result(g, Duration.Zero) // evaluates to 5
* Await.result(h, Duration.Zero) // throw a NoSuchElementException
* }}}
*/
def collect[S](pf: PartialFunction[T, S])(implicit executor: ExecutionContext): Future[S] =
map {
r => pf.applyOrElse(r, (t: T) => throw new NoSuchElementException("Future.collect partial function is not defined at: " + t))
}
/** Creates a new future that will handle any matching throwable that this
* future might contain. If there is no match, or if this future contains
* a valid result then the new future will contain the same.
*
* Example:
*
* {{{
* future (6 / 0) recover { case e: ArithmeticException => 0 } // result: 0
* future (6 / 0) recover { case e: NotFoundException => 0 } // result: exception
* future (6 / 2) recover { case e: ArithmeticException => 0 } // result: 3
* }}}
*/
def recover[U >: T](pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete { v => p complete (v recover pf) }
p.future
}
/** Creates a new future that will handle any matching throwable that this
* future might contain by assigning it a value of another future.
*
* If there is no match, or if this future contains
* a valid result then the new future will contain the same result.
*
* Example:
*
* {{{
* val f = future { Int.MaxValue }
* future (6 / 0) recoverWith { case e: ArithmeticException => f } // result: Int.MaxValue
* }}}
*/
def recoverWith[U >: T](pf: PartialFunction[Throwable, Future[U]])(implicit executor: ExecutionContext): Future[U] = {
val p = Promise[U]()
onComplete {
case Failure(t) => try pf.applyOrElse(t, (_: Throwable) => this).onComplete(p.complete)(internalExecutor) catch { case NonFatal(t) => p failure t }
case other => p complete other
}
p.future
}
/** Zips the values of `this` and `that` future, and creates
* a new future holding the tuple of their results.
*
* If `this` future fails, the resulting future is failed
* with the throwable stored in `this`.
* Otherwise, if `that` future fails, the resulting future is failed
* with the throwable stored in `that`.
*/
def zip[U](that: Future[U]): Future[(T, U)] = {
implicit val ec = internalExecutor
val p = Promise[(T, U)]()
onComplete {
case f: Failure[_] => p complete f.asInstanceOf[Failure[(T, U)]]
case Success(s) => that onComplete { c => p.complete(c map { s2 => (s, s2) }) }
}
p.future
}
/** Creates a new future which holds the result of this future if it was completed successfully, or, if not,
* the result of the `that` future if `that` is completed successfully.
* If both futures are failed, the resulting future holds the throwable object of the first future.
*
* Using this method will not cause concurrent programs to become nondeterministic.
*
* Example:
* {{{
* val f = future { sys.error("failed") }
* val g = future { 5 }
* val h = f fallbackTo g
* Await.result(h, Duration.Zero) // evaluates to 5
* }}}
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
implicit val ec = internalExecutor
val p = Promise[U]()
onComplete {
case s @ Success(_) => p complete s
case _ => p completeWith that
}
p.future
}
/** Creates a new `Future[S]` which is completed with this `Future`'s result if
* that conforms to `S`'s erased type or a `ClassCastException` otherwise.
*/
def mapTo[S](implicit tag: ClassTag[S]): Future[S] = {
implicit val ec = internalExecutor
val boxedClass = {
val c = tag.runtimeClass
if (c.isPrimitive) Future.toBoxed(c) else c
}
require(boxedClass ne null)
map(s => boxedClass.cast(s).asInstanceOf[S])
}
/** Applies the side-effecting function to the result of this future, and returns
* a new future with the result of this future.
*
* This method allows one to enforce that the callbacks are executed in a
* specified order.
*
* Note that if one of the chained `andThen` callbacks throws
* an exception, that exception is not propagated to the subsequent `andThen`
* callbacks. Instead, the subsequent `andThen` callbacks are given the original
* value of this future.
*
* The following example prints out `5`:
*
* {{{
* val f = future { 5 }
* f andThen {
* case r => sys.error("runtime exception")
* } andThen {
* case Failure(t) => println(t)
* case Success(v) => println(v)
* }
* }}}
*/
def andThen[U](pf: PartialFunction[Try[T], U])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
onComplete {
case r => try pf.applyOrElse[Try[T], Any](r, Predef.conforms[Try[T]]) finally p complete r
}
p.future
}
}
/** Future companion object.
*
* @define nonDeterministic
* Note: using this method yields nondeterministic dataflow programs.
*/
object Future {
private[concurrent] val toBoxed = Map[Class[_], Class[_]](
classOf[Boolean] -> classOf[java.lang.Boolean],
classOf[Byte] -> classOf[java.lang.Byte],
classOf[Char] -> classOf[java.lang.Character],
classOf[Short] -> classOf[java.lang.Short],
classOf[Int] -> classOf[java.lang.Integer],
classOf[Long] -> classOf[java.lang.Long],
classOf[Float] -> classOf[java.lang.Float],
classOf[Double] -> classOf[java.lang.Double],
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
/** Creates an already completed Future with the specified exception.
*
* @tparam T the type of the value in the future
* @return the newly created `Future` object
*/
def failed[T](exception: Throwable): Future[T] = Promise.failed(exception).future
/** Creates an already completed Future with the specified result.
*
* @tparam T the type of the value in the future
* @return the newly created `Future` object
*/
def successful[T](result: T): Future[T] = Promise.successful(result).future
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
* The result becomes available once the asynchronous computation is completed.
*
* @tparam T the type of the result
* @param body the asychronous computation
* @param execctx the execution context on which the future is run
* @return the `Future` holding the result of the computation
*/
def apply[T](body: =>T)(implicit execctx: ExecutionContext): Future[T] = impl.Future(body)
/** Simple version of `Futures.traverse`. Transforms a `TraversableOnce[Future[A]]` into a `Future[TraversableOnce[A]]`.
* Useful for reducing many `Future`s into a single `Future`.
*/
def sequence[A, M[_] <: TraversableOnce[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
in.foldLeft(Promise.successful(cbf(in)).future) {
(fr, fa) => for (r <- fr; a <- fa.asInstanceOf[Future[A]]) yield (r += a)
} map (_.result)
}
/** Returns a new `Future` to the result of the first future in the list that is completed.
*/
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val completeFirst: Try[T] => Unit = p tryComplete _
futures foreach { _ onComplete completeFirst }
p.future
}
/** Returns a `Future` that will hold the optional result of the first `Future` with a result that matches the predicate.
*/
def find[T](futurestravonce: TraversableOnce[Future[T]])(predicate: T => Boolean)(implicit executor: ExecutionContext): Future[Option[T]] = {
val futures = futurestravonce.toBuffer
if (futures.isEmpty) Promise.successful[Option[T]](None).future
else {
val result = Promise[Option[T]]()
val ref = new AtomicInteger(futures.size)
val search: Try[T] => Unit = v => try {
v match {
case Success(r) => if (predicate(r)) result tryComplete Success(Some(r))
case _ =>
}
} finally {
if (ref.decrementAndGet == 0) {
result tryComplete Success(None)
}
}
futures.foreach(_ onComplete search)
result.future
}
}
/** A non-blocking fold over the specified futures, with the start value of the given zero.
* The fold is performed on the thread where the last future is completed,
* the result will be the first failure of any of the futures, or any failure in the actual fold,
* or the result of the fold.
*
* Example:
* {{{
* val result = Await.result(Future.fold(futures)(0)(_ + _), 5 seconds)
* }}}
*/
def fold[T, R](futures: TraversableOnce[Future[T]])(zero: R)(foldFun: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Future.successful(zero)
else sequence(futures).map(_.foldLeft(zero)(foldFun))
}
/** Initiates a fold over the supplied futures where the fold-zero is the result value of the `Future` that's completed first.
*
* Example:
* {{{
* val result = Await.result(Future.reduce(futures)(_ + _), 5 seconds)
* }}}
*/
def reduce[T, R >: T](futures: TraversableOnce[Future[T]])(op: (R, T) => R)(implicit executor: ExecutionContext): Future[R] = {
if (futures.isEmpty) Future.failed(new NoSuchElementException("reduce attempted on empty collection"))
else sequence(futures).map(_ reduceLeft op)
}
/** Transforms a `TraversableOnce[A]` into a `Future[TraversableOnce[B]]` using the provided function `A => Future[B]`.
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel:
*
* {{{
* val myFutureList = Future.traverse(myList)(x => Future(myFunc(x)))
* }}}
*/
def traverse[A, B, M[_] <: TraversableOnce[_]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Promise.successful(cbf(in)).future) { (fr, a) =>
val fb = fn(a.asInstanceOf[A])
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result)
// This is used to run callbacks which are internal
// to scala.concurrent; our own callbacks are only
// ever used to eventually run another callback,
// and that other callback will have its own
// executor because all callbacks come with
// an executor. Our own callbacks never block
// and have no "expected" exceptions.
// As a result, this executor can do nothing;
// some other executor will always come after
// it (and sometimes one will be before it),
// and those will be performing the "real"
// dispatch to code outside scala.concurrent.
// Because this exists, ExecutionContext.defaultExecutionContext
// isn't instantiated by Future internals, so
// if some code for some reason wants to avoid
// ever starting up the default context, it can do so
// by just not ever using it itself. scala.concurrent
// doesn't need to create defaultExecutionContext as
// a side effect.
private[concurrent] object InternalCallbackExecutor extends ExecutionContext with java.util.concurrent.Executor {
override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("problem in scala.concurrent internal callback", t)
/**
* The BatchingExecutor trait had to be inlined into InternalCallbackExecutor for binary compatibility.
*
* BatchingExecutor is a trait for an Executor
* which groups multiple nested `Runnable.run()` calls
* into a single Runnable passed to the original
* Executor. This can be a useful optimization
* because it bypasses the original context's task
* queue and keeps related (nested) code on a single
* thread which may improve CPU affinity. However,
* if tasks passed to the Executor are blocking
* or expensive, this optimization can prevent work-stealing
* and make performance worse. Also, some ExecutionContext
* may be fast enough natively that this optimization just
* adds overhead.
* The default ExecutionContext.global is already batching
* or fast enough not to benefit from it; while
* `fromExecutor` and `fromExecutorService` do NOT add
* this optimization since they don't know whether the underlying
* executor will benefit from it.
* A batching executor can create deadlocks if code does
* not use `scala.concurrent.blocking` when it should,
* because tasks created within other tasks will block
* on the outer task completing.
* This executor may run tasks in any order, including LIFO order.
* There are no ordering guarantees.
*
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
*/
// invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
private val _tasksLocal = new ThreadLocal[List[Runnable]]()
private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
private[this] var parentBlockContext: BlockContext = _
// this method runs in the delegate ExecutionContext's thread
override def run(): Unit = {
require(_tasksLocal.get eq null)
val prevBlockContext = BlockContext.current
BlockContext.withBlockContext(this) {
try {
parentBlockContext = prevBlockContext
@tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
case Nil => ()
case head :: tail =>
_tasksLocal set tail
try {
head.run()
} catch {
case t: Throwable =>
// if one task throws, move the
// remaining tasks to another thread
// so we can throw the exception
// up to the invoking executor
val remaining = _tasksLocal.get
_tasksLocal set Nil
unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
throw t // rethrow
}
processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
}
processBatch(initial)
} finally {
_tasksLocal.remove()
parentBlockContext = null
}
}
}
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
// if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
{
val tasks = _tasksLocal.get
_tasksLocal set Nil
if ((tasks ne null) && tasks.nonEmpty)
unbatchedExecute(new Batch(tasks))
}
// now delegate the blocking to the previous BC
require(parentBlockContext ne null)
parentBlockContext.blockOn(thunk)
}
}
override def execute(runnable: Runnable): Unit = runnable match {
// If we can batch the runnable
case _: OnCompleteRunnable =>
_tasksLocal.get match {
case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
}
// If not batchable, just delegate to underlying
case _ =>
unbatchedExecute(runnable)
}
private def unbatchedExecute(r: Runnable): Unit = r.run()
}
}
/** A marker indicating that a `java.lang.Runnable` provided to `scala.concurrent.ExecutionContext`
* wraps a callback provided to `Future.onComplete`.
* All callbacks provided to a `Future` end up going through `onComplete`, so this allows an
* `ExecutionContext` to special-case callbacks that were executed by `Future` if desired.
*/
trait OnCompleteRunnable {
self: Runnable =>
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment