Skip to content

Instantly share code, notes, and snippets.

@randomstatistic
Created July 17, 2015 18:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save randomstatistic/b8b05efcc156a349b0d8 to your computer and use it in GitHub Desktop.
Save randomstatistic/b8b05efcc156a349b0d8 to your computer and use it in GitHub Desktop.
Block simultaneous future creation beyond a threshold
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.Future
/**
* Thread-safe lock on Future generation. The put() method accepts futures without blocking so long as there are less than
* $size futures that have been added via put() that are still alive. If more than $size futures are still running,
* calling put() *blocks* the calling thread until some of the current futures finish.
* It's highly recommended that futures provided to put() have some kind of timeout.
*/
class FutureBuffer[T](size: Int) {
private val bq = new LinkedBlockingQueue[Future[T]](size)
// A snapshot of the futures currently running
def inFlight() = bq.iterator().asScala
// Blocks iff there are $size futures currently in flight
@tailrec
final def put(f: Future[T], delayMs: Int = 0): Boolean = {
val accepted =
if (delayMs == 0) {
bq.offer(f) // this offer API is less work
} else {
bq.offer(f, delayMs, TimeUnit.MILLISECONDS)
}
accepted match {
case true =>
true
case false =>
clean() // try to make room
put(f, 50) // try again with an offer delay so we don't spin cycle too fast
}
}
private def clean() = {
inFlight().foreach(f =>
if (f.isCompleted) bq.remove(f)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment