Skip to content

Instantly share code, notes, and snippets.

@kevinwright
Last active October 5, 2018 08:09
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kevinwright/4949162 to your computer and use it in GitHub Desktop.
Save kevinwright/4949162 to your computer and use it in GitHub Desktop.
Limit number of futures running in parallel (now updated with seize/release implementations suggested by roland)
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.annotation.tailrec
//inspired by https://gist.github.com/viktorklang/4552423
object ThrottledExecutionContext {
def apply(maxConcurrents: Int)(implicit context: ExecutionContext): ExecutionContext = {
require(maxConcurrents > 0, s"ThrottledExecutionContext.maxConcurrents must be greater than 0 but was $maxConcurrents")
new ConcurrentLinkedQueue[Runnable] with Runnable with ExecutionContext {
private final val on = new AtomicInteger(0)
@tailrec private def seizeSlot(): Boolean = {
val n = on.get
n < maxConcurrents && (on.compareAndSet(n, n+1) || seizeSlot())
}
private def releaseSlot(): Unit = on.decrementAndGet()
override def add(task: Runnable): Boolean = {
val r = super.add(task)
attach()
r
}
final def run(): Unit = try {
poll() match {
case null => ()
case some => try some.run() catch { case NonFatal(t) => context reportFailure t }
}
} finally {
releaseSlot()
attach()
}
final def attach(): Unit =
if(!isEmpty && seizeSlot()) {
try context.execute(this) catch { case t: Throwable => releaseSlot(); throw t }
}
override final def execute(task: Runnable): Unit = add(task)
override final def reportFailure(t: Throwable): Unit = context reportFailure t
}
}
}
@rkuhn
Copy link

rkuhn commented Feb 28, 2013

Looks interesting for carving out sub-contexts; your seizeSlot and releaseSlot are broken, though, because the CAS failing does not mean that n was increased (as you seem to assume). Why not

@tailrec private def seizeSlot(): Boolean = {
  val n = on.get
  n < max && (on.compareAndSet(n, n+1) || seizeSlot())
}
@tailrec private def releaseSlot(): Unit = on.decrementAndGet()

@kevinwright
Copy link
Author

@tailrec doesn't make sense on releaseSlot there, but it's definitely cleaner than my implementation. Pushing it through the unit tests now :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment