Skip to content

Instantly share code, notes, and snippets.

@carter437
Forked from lancegatlin/gist:ffb64667cfb286d98686
Created September 12, 2014 02:22
Show Gist options
  • Save carter437/ea2aa3a1939f74fcb8a7 to your computer and use it in GitHub Desktop.
Save carter437/ea2aa3a1939f74fcb8a7 to your computer and use it in GitHub Desktop.
object FutureOps extends FutureOps
trait FutureOps {
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean)(implicit ec:ExecutionContext): Future[Option[A]] = {
if(futures.nonEmpty) {
val promise = Promise[Option[A]]()
val completedCount = new java.util.concurrent.atomic.AtomicInteger(0)
val allDoneLatch = Promise[Unit]
def maybeAllDone() {
// Note: check and execute normally creates a race but only one of the futures can cause the final count to be reached
if(completedCount.incrementAndGet() == futures.size) {
allDoneLatch.success(())
}
}
Future.sequence(futures) onComplete { case _ =>
allDoneLatch.future onSuccess { case _ =>
val exceptions = futures.map(_.value).flatten.filter(_.isFailure).map(_.failed.get)
if(!promise.isCompleted) {
// If there was at least one success that didn't pass the predicate
if(exceptions.size < futures.size) {
exceptions.foreach(ex => ec.reportFailure(ex))
promise.success(None)
} else {
promise.failure(new RuntimeException(s"All Futures Failed - StackTraces: ${exceptions.toList.zipWithIndex.map { case (e, i) => s"Exception ${i} ${ExceptionUtils.getStackTrace(e)}"}}"))
}
} else {
exceptions.foreach(ex => ec.reportFailure(ex))
}
}
}
futures foreach { x => x.onComplete {
case Success(a) if predicate(a) =>
promise.trySuccess(Some(a))
maybeAllDone()
case _ =>
maybeAllDone()
}
}
promise.future
}
else {
Future.successful(None)
}
}
def takeFirst[A](futures: Traversable[Future[A]]) = takeFirstMatch(futures,{ _:A => true})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment