Last active
August 1, 2017 09:52
-
-
Save stephennancekivell/6583eaf499f461f0c9c56a394507e229 to your computer and use it in GitHub Desktop.
Scala Future gatherUnordered
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Like Future.sequence but but limits the parallelism to only process so many futures at once. | |
* Useful if you have lots of future operations but dont want to overload something with too many at once. | |
*/ | |
def gatherUnordered[A, B](parallelism: Int, | |
xs: Seq[A])(fn: A => Future[B])(implicit ex: ExecutionContext): Future[Seq[B]] = { | |
def go(todo: Seq[A], inprogress: Seq[Future[B]], acc: Seq[B]): Future[Seq[B]] = { | |
if (inprogress.size < parallelism && todo.nonEmpty) { | |
val numToAdd = parallelism - inprogress.size | |
val (toStart, stillTodo) = todo.splitAt(numToAdd) | |
go(stillTodo, toStart.map(fn) ++ inprogress, acc) | |
} else if (todo.isEmpty && inprogress.isEmpty) { | |
Future.successful(acc) | |
} else { | |
Future.firstCompletedOf(inprogress).flatMap { _ => | |
val (doneFutures, stillInProgress) = inprogress.partition(_.isCompleted) | |
Future.sequence(doneFutures).flatMap { done => | |
go(todo, stillInProgress, acc ++ done) | |
} | |
} | |
} | |
} | |
go(xs, Nil, Nil) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment