Last active
June 5, 2017 01:13
-
-
Save sgrankin/105e33eaf9ee8730eb7d9ba8661b6140 to your computer and use it in GitHub Desktop.
Finagle CoDel (modified) & Adaptive LIFO queues. Refs: https://github.com/facebook/wangle/blob/master/wangle/concurrent/Codel.cpp, http://queue.acm.org/appendices/codel.html
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import java.util | |
import java.util.concurrent.RejectedExecutionException | |
import com.twitter.app.App | |
import com.twitter.concurrent.Permit | |
import com.twitter.finagle.http.{Request, Response} | |
import com.twitter.finagle.util.HashedWheelTimer | |
import com.twitter.finagle.{Http, Service, param => fparam, _} | |
import com.twitter.util.TimeConversions._ | |
import com.twitter.util._ | |
import scala.util.Random | |
trait DelayControl { | |
def shouldDrop(delay: Duration): Boolean | |
def overloaded: Boolean | |
def minDelay: Duration | |
} | |
case class CoDelDelayControl( | |
target: Duration, | |
interval: Duration | |
) extends DelayControl { | |
private[this] var _minDelay = Duration.Zero | |
private[this] var _overloaded = false | |
private[this] var _intervalEnd = Time.now | |
override def overloaded: Boolean = _overloaded | |
override def minDelay: Duration = _minDelay | |
override def shouldDrop(delay: Duration): Boolean = synchronized { | |
val now = Time.now | |
if (now > _intervalEnd) { | |
_intervalEnd = now + interval | |
_overloaded = _minDelay > target | |
_minDelay = delay | |
return false | |
} | |
if (delay < _minDelay) | |
_minDelay = delay | |
return _overloaded && delay > target * 2 | |
} | |
} | |
trait AsyncGate { | |
def acquire(): Future[Permit] | |
def concurrency: Int | |
def waiters: Int | |
} | |
case class AdaptiveLifoAsyncSemaphore( | |
initialPermits: Int, | |
maxWaiters: Int, | |
lifoThreshold: Double | |
) extends AsyncGate { self => | |
import AdaptiveLifoAsyncSemaphore._ | |
require(maxWaiters >= 0, s"maxWaiters must be non-negative: $maxWaiters") | |
require(initialPermits > 0, s"initialPermits must be positive: $initialPermits") | |
require(0.0 <= lifoThreshold && lifoThreshold <= 1.1, s"lifoThreshold must be in range [0.0,1.0]") | |
private[this] var closed: Option[Throwable] = None | |
private[this] val waitq = new util.ArrayDeque[Promise[Permit]] | |
private[this] var availablePermits = initialPermits | |
private[this] val lifoThresholdWaiters = (maxWaiters * lifoThreshold).ceil.toInt | |
private[this] val semaphorePermit = new Permit { | |
/** | |
* Indicate that you are done with your Permit. | |
*/ | |
override def release(): Unit = self.synchronized { | |
val next = | |
if (waitq.size() > lifoThresholdWaiters) waitq.pollLast() | |
else waitq.pollFirst() | |
if (next != null) | |
next.setValue(this) | |
else | |
availablePermits += 1 | |
} | |
} | |
private[this] val futurePermit = Future.value(semaphorePermit) | |
def concurrency: Int = synchronized { initialPermits - availablePermits } | |
def waiters = synchronized { waitq.size() } | |
def acquire(): Future[Permit] = self.synchronized { | |
if (closed.isDefined) | |
return Future.exception(closed.get) | |
if (availablePermits > 0) { | |
availablePermits -= 1 | |
futurePermit | |
} else if (waitq.size >= maxWaiters) { | |
MaxWaitersExceededException | |
} else { | |
val promise = new Promise[Permit] | |
promise.setInterruptHandler { | |
case t: Throwable => | |
self.synchronized { | |
if (promise.updateIfEmpty(Throw(t))) | |
waitq.remove(promise) | |
} | |
} | |
waitq.addLast(promise) | |
promise | |
} | |
} | |
} | |
object AdaptiveLifoAsyncSemaphore { | |
private val MaxWaitersExceededException = | |
Future.exception(new RejectedExecutionException("Max waiters exceeded")) | |
} | |
object RequestSemaphoreFilter { | |
val role = Stack.Role("RequestConcurrencyLimit") | |
object param { | |
/** | |
* A class eligible for configuring a [[com.twitter.finagle.Stackable]] | |
* [[com.twitter.finagle.filter.RequestSemaphoreFilter]] module. | |
*/ | |
case class Gate(sem: AsyncGate) { | |
def mk(): (Gate, Stack.Param[Gate]) = (this, Gate.param) | |
} | |
object Gate { | |
implicit val param = Stack.Param { | |
val threadPool = math.max((com.twitter.jvm.numProcs() * 2).ceil.toInt, 8) | |
Gate( | |
new AdaptiveLifoAsyncSemaphore(initialPermits = threadPool, | |
maxWaiters = threadPool / 2, | |
lifoThreshold = 0.7) | |
) | |
} | |
} | |
case class Control(control: DelayControl) { | |
def mk(): (Control, Stack.Param[Control]) = (this, Control.param) | |
} | |
object Control { | |
implicit val param = Stack.Param { | |
Control( | |
new CoDelDelayControl( | |
target = 5.milliseconds, | |
interval = 100.milliseconds | |
)) | |
} | |
} | |
case class Timeout(timeout: Duration) { | |
def mk(): (Timeout, Stack.Param[Timeout]) = (this, Timeout.param) | |
} | |
object Timeout { | |
implicit val param = Stack.Param { Timeout(Duration.Top) } | |
} | |
} | |
/** | |
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.filter.RequestSemaphoreFilter]]. | |
*/ | |
def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] = | |
new Stack.Module4[param.Gate, param.Control, param.Timeout, fparam.Stats, ServiceFactory[Req, Rep]] { | |
val role = RequestSemaphoreFilter.role | |
val description = "Restrict number of concurrent requests" | |
def make( | |
_gate: param.Gate, | |
_control: param.Control, | |
_timeout: param.Timeout, | |
_stats: fparam.Stats, | |
next: ServiceFactory[Req, Rep] | |
) = { | |
val param.Gate(sem) = _gate | |
val param.Control(control) = _control | |
val param.Timeout(timeout) = _timeout | |
val fparam.Stats(sr) = _stats | |
val filter = new RequestSemaphoreFilter[Req, Rep](sem, control, timeout) { | |
// We capture the gauges inside of here so their | |
// (reference) lifetime is tied to that of the filter | |
// itself. | |
val gauges = Seq( | |
sr.addGauge("request_concurrency") { sem.concurrency }, | |
sr.addGauge("request_queue_size") { sem.waiters }, | |
sr.addGauge("request_queue_overloaded") { if (control.overloaded) 1.0f else 0.0f }, | |
sr.addGauge("request_queue_min_delay") { control.minDelay.inMilliseconds.toFloat } | |
) | |
} | |
filter andThen next | |
} | |
} | |
} | |
class RequestSemaphoreFilter[Req, Rep]( | |
sem: AsyncGate, | |
control: DelayControl, | |
timeout: Duration | |
) extends SimpleFilter[Req, Rep] { | |
val timer = HashedWheelTimer.Default | |
def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = { | |
val timeIn = Time.now | |
sem.acquire().within(timer, timeout).transform { | |
case Return(permit) => | |
val timeOut = Time.now | |
if (control.shouldDrop(timeOut - timeIn)) { | |
permit.release() | |
Future.exception(Failure.rejected("due to delay control")) | |
} else | |
service(req).ensure { permit.release() } | |
case Throw(noPermit) => | |
Future.exception(Failure.rejected(noPermit)) | |
} | |
} | |
} | |
object Main extends App { | |
val service: Service[Request, Response] = (req: Request) => { | |
implicit val timer = com.twitter.finagle.util.HashedWheelTimer.Default | |
val rep = Response() | |
rep.contentString = "ok" | |
Future | |
.sleep((Random.nextInt(14) + 5).seconds) | |
.before(Future.value(rep)) | |
} | |
def main(): Unit = { | |
val pool = 16 | |
val queue = 8 | |
val timeout = 30.seconds | |
val server = | |
if (false) | |
Http.server.withAdmissionControl | |
.concurrencyLimit(pool, queue) | |
.withRequestTimeout(timeout) | |
else | |
Http.server | |
.withStack(Http.server.stack.replace(RequestSemaphoreFilter.role, | |
RequestSemaphoreFilter.module[Request, Response])) | |
.configured(RequestSemaphoreFilter.param.Gate(AdaptiveLifoAsyncSemaphore(pool, queue, .8))) | |
.configured( | |
RequestSemaphoreFilter.param.Control(CoDelDelayControl(5.milliseconds, 100.milliseconds))) | |
.configured(RequestSemaphoreFilter.param.Timeout(timeout)) | |
Await.ready( | |
server.serve(":8080", service) | |
) // waits until the server resources are released | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment