Created
August 1, 2017 09:53
-
-
Save stephennancekivell/822592b3f32d906c04bf9acbde967c7a to your computer and use it in GitHub Desktop.
Scala Future gatherUnordered Usage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`net.databinder.dispatch::dispatch-core:0.13.1` | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.Future | |
import dispatch._, Defaults._ | |
val svc = url("http://httpbin.org/ip") | |
def get() = Http.default(svc OK as.String) | |
def getAndTime(idx: Int) = { | |
val start = System.currentTimeMillis() | |
println(start + "\t" + idx + " starting") | |
get().map { _ => | |
// println("\t" + idx + " done took "+ (System.currentTimeMillis() - start) + "ms") | |
println(System.currentTimeMillis() + "\t" + idx + "done") | |
} | |
} | |
/** | |
* Like Future.sequence but but limits the parallelism to only process so many futures at once. | |
* Useful if you have lots of future operations but dont want to overload something with too many at once. | |
*/ | |
def gatherUnordered[A, B](parallelism: Int, | |
xs: Seq[A])(fn: A => Future[B]): Future[Seq[B]] = { | |
def go(todo: Seq[A], inprogress: Seq[Future[B]], acc: Seq[B]): Future[Seq[B]] = { | |
println(s"go parallelism# $parallelism todo# ${todo.size} #inprogress ${inprogress.size} #acc ${acc.size}") | |
if (inprogress.size < parallelism && todo.nonEmpty) { | |
val numToAdd = parallelism - inprogress.size | |
val (toStart, stillTodo) = todo.splitAt(numToAdd) | |
go(stillTodo, toStart.map(fn) ++ inprogress, acc) | |
} else if (todo.isEmpty && inprogress.isEmpty) { | |
Future.successful(acc) | |
} else { | |
Future.firstCompletedOf(inprogress).flatMap { _ => | |
val (doneFutures, stillInProgress) = inprogress.partition(_.isCompleted) | |
Future.sequence(doneFutures).flatMap { done => | |
go(todo, stillInProgress, acc ++ done) | |
} | |
} | |
} | |
} | |
go(xs, Nil, Nil) | |
} | |
val xs = 0 to 10 | |
val done = gatherUnordered(3, xs) { idx => | |
getAndTime(idx) | |
} | |
import scala.concurrent.duration._ | |
println(scala.concurrent.Await.ready(done, 20.seconds)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
go parallelism# 3 todo# 11 #inprogress 0 #acc 0 | |
1501581041113 0 starting | |
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". | |
SLF4J: Defaulting to no-operation (NOP) logger implementation | |
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. | |
1501581041561 1 starting | |
1501581041561 2 starting | |
go parallelism# 3 todo# 8 #inprogress 3 #acc 0 | |
1501581042083 1done | |
1501581042083 2done | |
1501581042083 0done | |
go parallelism# 3 todo# 8 #inprogress 0 #acc 3 | |
1501581042090 3 starting | |
1501581042093 4 starting | |
1501581042093 5 starting | |
go parallelism# 3 todo# 5 #inprogress 3 #acc 3 | |
1501581042475 5done | |
1501581042475 4done | |
1501581042476 3done | |
go parallelism# 3 todo# 5 #inprogress 1 #acc 5 | |
1501581042476 6 starting | |
1501581042476 7 starting | |
go parallelism# 3 todo# 3 #inprogress 3 #acc 5 | |
go parallelism# 3 todo# 3 #inprogress 2 #acc 6 | |
1501581042477 8 starting | |
go parallelism# 3 todo# 2 #inprogress 3 #acc 6 | |
1501581042782 7done | |
1501581042782 8done | |
1501581042783 6done | |
go parallelism# 3 todo# 2 #inprogress 1 #acc 8 | |
1501581042783 9 starting | |
1501581042783 10 starting | |
go parallelism# 3 todo# 0 #inprogress 3 #acc 8 | |
go parallelism# 3 todo# 0 #inprogress 2 #acc 9 | |
1501581043089 10done | |
1501581043089 9done | |
go parallelism# 3 todo# 0 #inprogress 1 #acc 10 | |
go parallelism# 3 todo# 0 #inprogress 0 #acc 11 | |
Future(Success(List((), (), (), (), (), (), (), (), (), (), ()))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment