Skip to content

Instantly share code, notes, and snippets.

@alexeevg
Last active August 29, 2015 14:03
Show Gist options
  • Save alexeevg/973bf5ee470fdf819dcf to your computer and use it in GitHub Desktop.
Save alexeevg/973bf5ee470fdf819dcf to your computer and use it in GitHub Desktop.
def processBatches[T](input: Enumerator[T], parallelism: Int)
(process: T => Future[Unit])
(implicit ec: ExecutionContext): Future[Unit] = {
input.through(Enumeratee.map(process) compose chunker(parallelism)).run(Iteratee.foldM(()) {
case (_, results) => Future.sequence(results) map (_ => ())
})
}
def chunker[T](size: Int): Enumeratee[T, List[T]] =
Enumeratee.grouped(Enumeratee.take(size).transform(Iteratee.getChunks[T]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment