Skip to content

Instantly share code, notes, and snippets.

@lancegatlin
Forked from carter437/gist:daa209c27c140e42d381
Last active August 29, 2015 14:06
Show Gist options
  • Save lancegatlin/ffb64667cfb286d98686 to your computer and use it in GitHub Desktop.
Save lancegatlin/ffb64667cfb286d98686 to your computer and use it in GitHub Desktop.
object FutureOps extends FutureOps
trait FutureOps {
def toTry[A](self: Future[A])(implicit ec: ExecutionContext): Future[Try[A]] = {
val p = Promise[Try[A]]()
self onComplete { case result => p.success(result) }
p.future
}
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean)(implicit ec:ExecutionContext): Future[Option[A]] = {
if(futures.nonEmpty) {
val promise = Promise[Option[A]]()
val completedCount = new java.util.concurrent.atomic.AtomicInteger(0)
val allDoneLatch = Promise[Unit])()
def maybeAllDone() {
// Note: check and execute normally creates a race but only one of the futures can cause the final count to be reached
if(completedCount.incrementAndGet() == futures.size) {
allDoneLatch.success(())
}
}
// Note: Future.sequence can't fail here since toTry never fails
Future.sequence(futures.map(_.toTry)) onSuccess { case completedFutures =>
// Note: allDoneLatch can only succeed
allDoneLatch.future onSuccess { case _ =>
val exceptions = completedFutures.flatMap(_.failed.toOption)
if(promise.isCompleted == false) {
// If there was at least one success that didn't pass the predicate
if(exceptions.size < futures.size) {
exceptions.foreach(ex => ec.reportFailure(ex))
promise.success(None)
} else {
promise.failed(new RuntimeException(s"All Futures Failed - StackTraces: ${exceptions.zipWithIndex.map { case (e, i) => s"Exception ${i} ${ExceptionUtils.getStackTrace(e)}"}}"))
}
} else {
// Ensure any exceptions that happened aren't blackholed when this method succeeds
exceptions.foreach(ex => ec.reportFailure(ex))
}
}
}
futures foreach { _.onComplete {
case Success(a) if predicate(a) =>
promise.trySuccess(Some(a))
maybeAllDone()
case _ =>
maybeAllDone()
}
promise.future
} else {
Future.successful(None)
}
}
def takeFirst[A](futures: Traversable[Future[A]]) = takeFirstMatch(futures,{ _:A => true})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment