Skip to content

Instantly share code, notes, and snippets.

@kevinwright
Last active April 14, 2016 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinwright/93036260501236ec629105f81fa78da0 to your computer and use it in GitHub Desktop.
Save kevinwright/93036260501236ec629105f81fa78da0 to your computer and use it in GitHub Desktop.
ThrottledRunner
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.{Failure, Success}
object AsyncUtils {
/**
* A subtle variant on `Future.traverse` that forces the Futures to be executed one at a time
* instead of allowing parallelism
*/
def sequencedTraverse
[A, B, M[X] <: TraversableOnce[X]]
(in: M[A])
(fn: A => Future[B])
(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext)
: Future[M[B]] = {
in.foldLeft(Future.successful(cbf(in))){ (facc, a) =>
for (acc <- facc; b <- fn(a)) yield (acc += b)
}.map(_.result())
}
def throttledTraverse
[A, B, M[X] <: TraversableOnce[X]]
(in: M[A], maxConcurrents: Int)
(fn: A => Future[B])
(implicit
cbf1: CanBuildFrom[M[A], Future[B], M[Future[B]]],
cbf2: CanBuildFrom[M[Future[B]], B, M[B]],
executor: ExecutionContext
)
: Future[M[B]] = {
Future.sequence(ThrottledRunner.run(maxConcurrents, in, fn))
}
}
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
final case class SharedThrottle(maxConcurrents: Int) {
require(maxConcurrents > 0, s"SharedThrottle.maxConcurrents must be greater than 0 but was $maxConcurrents")
private[this] val counter = new AtomicInteger(0)
/**
* Try to grab an available slot, uses a CAS operation internally to avoid locking,
* will self-recurse until this succeeds
* @return true if a slot was available and we acquired it, false if not slots free
*/
@tailrec def seizeSlot(): Boolean = {
val n = counter.get
@inline def plusone = n + 1
n < maxConcurrents && (counter.compareAndSet(n, plusone) || seizeSlot())
}
def releaseSlot(): Unit = { val _ = counter.decrementAndGet() }
def slotsUsed: Int = counter.get
}
import scala.language.higherKinds
import java.util.concurrent.ConcurrentLinkedQueue
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success, Try}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
object ThrottledRunner {
def run[A, B, M[X] <: TraversableOnce[X]](
maxConcurrents: Int,
in: M[A],
fn: A => Future[B]
)(implicit cbf: CanBuildFrom[M[A], Future[B], M[Future[B]]], ec: ExecutionContext)
: M[Future[B]] = run(SharedThrottle(maxConcurrents), in, fn)
def run[A, B, M[X] <: TraversableOnce[X]](
throttle: SharedThrottle,
in: M[A],
fn: A => Future[B]
)(implicit cbf: CanBuildFrom[M[A], Future[B], M[Future[B]]], ec: ExecutionContext): M[Future[B]] = {
import throttle._
object queue extends ConcurrentLinkedQueue[(A, Promise[B])] {
val results = new ConcurrentLinkedQueue[Future[B]]
def addItem(item: A): Boolean = add(item -> Promise[B])
override def add(pair: (A, Promise[B])): Boolean = {
val r = super.add(pair)
results add pair._2.future
attach()
r
}
final def completedTaskHandler(comp: Try[B], promise: Promise[B]): Unit = {
releaseSlot()
comp match {
case Success(result) =>
promise success result
attach()
case Failure(x) => val _ = promise failure x
}
}
final def attach(): Unit =
if(!isEmpty && seizeSlot()) {
poll() match {
case null => ()
case (x, promise) => fn(x) onComplete { comp => completedTaskHandler(comp, promise) }
}
}
}
in foreach queue.addItem
queue.results.iterator().asScala.to[M]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment