Skip to content

Instantly share code, notes, and snippets.

@stephennancekivell
Last active August 1, 2017 09:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stephennancekivell/6583eaf499f461f0c9c56a394507e229 to your computer and use it in GitHub Desktop.
Save stephennancekivell/6583eaf499f461f0c9c56a394507e229 to your computer and use it in GitHub Desktop.
Scala Future gatherUnordered
/**
* Like Future.sequence but but limits the parallelism to only process so many futures at once.
* Useful if you have lots of future operations but dont want to overload something with too many at once.
*/
def gatherUnordered[A, B](parallelism: Int,
xs: Seq[A])(fn: A => Future[B])(implicit ex: ExecutionContext): Future[Seq[B]] = {
def go(todo: Seq[A], inprogress: Seq[Future[B]], acc: Seq[B]): Future[Seq[B]] = {
if (inprogress.size < parallelism && todo.nonEmpty) {
val numToAdd = parallelism - inprogress.size
val (toStart, stillTodo) = todo.splitAt(numToAdd)
go(stillTodo, toStart.map(fn) ++ inprogress, acc)
} else if (todo.isEmpty && inprogress.isEmpty) {
Future.successful(acc)
} else {
Future.firstCompletedOf(inprogress).flatMap { _ =>
val (doneFutures, stillInProgress) = inprogress.partition(_.isCompleted)
Future.sequence(doneFutures).flatMap { done =>
go(todo, stillInProgress, acc ++ done)
}
}
}
}
go(xs, Nil, Nil)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment