Skip to content

Instantly share code, notes, and snippets.

@stephennancekivell
Created August 1, 2017 09:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stephennancekivell/822592b3f32d906c04bf9acbde967c7a to your computer and use it in GitHub Desktop.
Save stephennancekivell/822592b3f32d906c04bf9acbde967c7a to your computer and use it in GitHub Desktop.
Scala Future gatherUnordered Usage
import $ivy.`net.databinder.dispatch::dispatch-core:0.13.1`
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import dispatch._, Defaults._
val svc = url("http://httpbin.org/ip")
def get() = Http.default(svc OK as.String)
def getAndTime(idx: Int) = {
val start = System.currentTimeMillis()
println(start + "\t" + idx + " starting")
get().map { _ =>
// println("\t" + idx + " done took "+ (System.currentTimeMillis() - start) + "ms")
println(System.currentTimeMillis() + "\t" + idx + "done")
}
}
/**
* Like Future.sequence but but limits the parallelism to only process so many futures at once.
* Useful if you have lots of future operations but dont want to overload something with too many at once.
*/
def gatherUnordered[A, B](parallelism: Int,
xs: Seq[A])(fn: A => Future[B]): Future[Seq[B]] = {
def go(todo: Seq[A], inprogress: Seq[Future[B]], acc: Seq[B]): Future[Seq[B]] = {
println(s"go parallelism# $parallelism todo# ${todo.size} #inprogress ${inprogress.size} #acc ${acc.size}")
if (inprogress.size < parallelism && todo.nonEmpty) {
val numToAdd = parallelism - inprogress.size
val (toStart, stillTodo) = todo.splitAt(numToAdd)
go(stillTodo, toStart.map(fn) ++ inprogress, acc)
} else if (todo.isEmpty && inprogress.isEmpty) {
Future.successful(acc)
} else {
Future.firstCompletedOf(inprogress).flatMap { _ =>
val (doneFutures, stillInProgress) = inprogress.partition(_.isCompleted)
Future.sequence(doneFutures).flatMap { done =>
go(todo, stillInProgress, acc ++ done)
}
}
}
}
go(xs, Nil, Nil)
}
val xs = 0 to 10
val done = gatherUnordered(3, xs) { idx =>
getAndTime(idx)
}
import scala.concurrent.duration._
println(scala.concurrent.Await.ready(done, 20.seconds))
go parallelism# 3 todo# 11 #inprogress 0 #acc 0
1501581041113 0 starting
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
1501581041561 1 starting
1501581041561 2 starting
go parallelism# 3 todo# 8 #inprogress 3 #acc 0
1501581042083 1done
1501581042083 2done
1501581042083 0done
go parallelism# 3 todo# 8 #inprogress 0 #acc 3
1501581042090 3 starting
1501581042093 4 starting
1501581042093 5 starting
go parallelism# 3 todo# 5 #inprogress 3 #acc 3
1501581042475 5done
1501581042475 4done
1501581042476 3done
go parallelism# 3 todo# 5 #inprogress 1 #acc 5
1501581042476 6 starting
1501581042476 7 starting
go parallelism# 3 todo# 3 #inprogress 3 #acc 5
go parallelism# 3 todo# 3 #inprogress 2 #acc 6
1501581042477 8 starting
go parallelism# 3 todo# 2 #inprogress 3 #acc 6
1501581042782 7done
1501581042782 8done
1501581042783 6done
go parallelism# 3 todo# 2 #inprogress 1 #acc 8
1501581042783 9 starting
1501581042783 10 starting
go parallelism# 3 todo# 0 #inprogress 3 #acc 8
go parallelism# 3 todo# 0 #inprogress 2 #acc 9
1501581043089 10done
1501581043089 9done
go parallelism# 3 todo# 0 #inprogress 1 #acc 10
go parallelism# 3 todo# 0 #inprogress 0 #acc 11
Future(Success(List((), (), (), (), (), (), (), (), (), (), ())))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment