Skip to content

Instantly share code, notes, and snippets.

@ches
Created June 2, 2016 13:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ches/a372c4755e9cbe55baa2f2975c34e94c to your computer and use it in GitHub Desktop.
Save ches/a372c4755e9cbe55baa2f2975c34e94c to your computer and use it in GitHub Desktop.
Extra Future combinators for Scala
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.generic.CanBuildFrom
import scala.concurrent._
import scala.util.{ Failure, Success }
/*
* Enrichments for some useful Future combinators absent from the standard
* library.
*
* TODO: write some tests for these! :-)
*/
implicit class FutureCompanionOps(val f: Future.type) extends AnyVal {
/**
* Like `Future.sequence`, transforms a `TraversableOnce[Future[A]]` into a
* `Future[TraversableOnce[A]]`, but does '''not''' wait for all futures to
* complete---if one fails the failure is returned immediately.
*
* Semantically equivalent to ECMAScript 6's `Promise.all`.
*
* Note: Using this method may result in non-deterministic concurrent programs.
*
* @see http://stackoverflow.com/a/16258132/455009
*/
def all[A, M[X] <: TraversableOnce[X]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
val p = Promise[M[A]]()
// If there is a failure, complete the Promise immediately
in foreach {
_ onFailure { case e => p.tryFailure(e) }
}
// Else complete with the aggregate success
Future.sequence(in) foreach (p trySuccess _)
p.future
}
/**
* Like `Future.firstCompletedOf`, but returns the first ''successful'' future
* rather than any completed one. Else, returns the last future to fail. Could
* reasonably be called `firstSucceededOf` like Ray Roestenburg first called
* it, but I'm going for the any/all dual here.
*
* Note: Using this method may result in non-deterministic concurrent programs.
*
* @see https://gist.github.com/RayRoestenburg/6096365
*/
def any[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val remaining = new AtomicInteger(futures.size)
val p = Promise[T]()
futures foreach {
_ onComplete {
case Success(v) => p.trySuccess(v)
case Failure(e) => if (remaining.decrementAndGet() == 0) p.tryFailure(e)
}
}
p.future
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment