Skip to content

Instantly share code, notes, and snippets.

@natewave
Created December 11, 2015 15:54
Show Gist options
  • Save natewave/8ad4e74b8fa253bb1f71 to your computer and use it in GitHub Desktop.
Save natewave/8ad4e74b8fa253bb1f71 to your computer and use it in GitHub Desktop.
import scala.concurrent.{ Future, ExecutionContext }
object GroupedAsync {
def sequencePar[K, V, R](fs: ((K, V)) => Future[(K, R)])(input: Seq[(K, V)])(implicit ec: ExecutionContext): Future[Seq[(K, R)]] = {
val result: Future[Seq[(K, R)]] = {
val grouped: Map[K, Seq[(K, V)]] = input.groupBy(_._1)
val futurePerKeyMap: Map[K, Future[Seq[(K, R)]]] = grouped.mapValues { elements =>
val l: Future[Seq[(K, R)]] = elements.foldLeft(Future(Seq.empty[(K, R)])) {
(previousFuture, next) =>
for {
previousResults <- previousFuture
next <- fs(next)
} yield previousResults :+ next
}
l
}
val future: Future[Seq[Seq[(K, R)]]] = Future.sequence(futurePerKeyMap.valuesIterator.toSeq)
val flat = future.map(_.flatten)
flat
}
result
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment