Skip to content

Instantly share code, notes, and snippets.

object FutureOps extends FutureOps
trait FutureOps {
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean)(implicit ec:ExecutionContext): Future[Option[A]] = {
if(futures.nonEmpty) {
val promise = Promise[Option[A]]()
val completedCount = new java.util.concurrent.atomic.AtomicInteger(0)
val allDoneLatch = Promise[Unit]
def maybeAllDone() {
// Note: check and execute normally creates a race but only one of the futures can cause the final count to be reached
if(completedCount.incrementAndGet() == futures.size) {
object FutureOps extends FutureOps
trait FutureOps {
def toTry[A](self: Future[A])(implicit ec: ExecutionContext): Future[Try[A]] = {
val p = Promise[Try[A]]()
self onComplete { case result => p.success(result) }
p.future
}
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean)(implicit ec:ExecutionContext): Future[Option[A]] = {
if(futures.nonEmpty) {
val promise = Promise[Option[A]]()
@carter437
carter437 / gist:daa209c27c140e42d381
Last active August 29, 2015 14:06
Basic Future takeFirstMatch
object FutureOps extends FutureOps
trait FutureOps {
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean): Future[Option[A]] = {
val promise = Promise[Option[A]]
def aggregateFailures(futuresToFailureCheck: Traversable[Future[A]], exceptions: Seq[Throwable]): Unit = {
futuresToFailureCheck match {
case Nil => {
Promise.failed(new RuntimeException(s"All Futures Failed - StackTraces: ${exceptions.zipWithIndex.map { case (e, i) => s"Exception ${i} ${ExceptionUtils.getStackTrace(e)}"}}"))
}
case head :: tail => head onFailure { case ex => aggregateFailures(tail, exceptions :+ ex)}