Skip to content

Instantly share code, notes, and snippets.

@sgrankin
Last active June 5, 2017 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sgrankin/105e33eaf9ee8730eb7d9ba8661b6140 to your computer and use it in GitHub Desktop.
Save sgrankin/105e33eaf9ee8730eb7d9ba8661b6140 to your computer and use it in GitHub Desktop.
package main
import java.util
import java.util.concurrent.RejectedExecutionException
import com.twitter.app.App
import com.twitter.concurrent.Permit
import com.twitter.finagle.http.{Request, Response}
import com.twitter.finagle.util.HashedWheelTimer
import com.twitter.finagle.{Http, Service, param => fparam, _}
import com.twitter.util.TimeConversions._
import com.twitter.util._
import scala.util.Random
trait DelayControl {
def shouldDrop(delay: Duration): Boolean
def overloaded: Boolean
def minDelay: Duration
}
case class CoDelDelayControl(
target: Duration,
interval: Duration
) extends DelayControl {
private[this] var _minDelay = Duration.Zero
private[this] var _overloaded = false
private[this] var _intervalEnd = Time.now
override def overloaded: Boolean = _overloaded
override def minDelay: Duration = _minDelay
override def shouldDrop(delay: Duration): Boolean = synchronized {
val now = Time.now
if (now > _intervalEnd) {
_intervalEnd = now + interval
_overloaded = _minDelay > target
_minDelay = delay
return false
}
if (delay < _minDelay)
_minDelay = delay
return _overloaded && delay > target * 2
}
}
trait AsyncGate {
def acquire(): Future[Permit]
def concurrency: Int
def waiters: Int
}
case class AdaptiveLifoAsyncSemaphore(
initialPermits: Int,
maxWaiters: Int,
lifoThreshold: Double
) extends AsyncGate { self =>
import AdaptiveLifoAsyncSemaphore._
require(maxWaiters >= 0, s"maxWaiters must be non-negative: $maxWaiters")
require(initialPermits > 0, s"initialPermits must be positive: $initialPermits")
require(0.0 <= lifoThreshold && lifoThreshold <= 1.1, s"lifoThreshold must be in range [0.0,1.0]")
private[this] var closed: Option[Throwable] = None
private[this] val waitq = new util.ArrayDeque[Promise[Permit]]
private[this] var availablePermits = initialPermits
private[this] val lifoThresholdWaiters = (maxWaiters * lifoThreshold).ceil.toInt
private[this] val semaphorePermit = new Permit {
/**
* Indicate that you are done with your Permit.
*/
override def release(): Unit = self.synchronized {
val next =
if (waitq.size() > lifoThresholdWaiters) waitq.pollLast()
else waitq.pollFirst()
if (next != null)
next.setValue(this)
else
availablePermits += 1
}
}
private[this] val futurePermit = Future.value(semaphorePermit)
def concurrency: Int = synchronized { initialPermits - availablePermits }
def waiters = synchronized { waitq.size() }
def acquire(): Future[Permit] = self.synchronized {
if (closed.isDefined)
return Future.exception(closed.get)
if (availablePermits > 0) {
availablePermits -= 1
futurePermit
} else if (waitq.size >= maxWaiters) {
MaxWaitersExceededException
} else {
val promise = new Promise[Permit]
promise.setInterruptHandler {
case t: Throwable =>
self.synchronized {
if (promise.updateIfEmpty(Throw(t)))
waitq.remove(promise)
}
}
waitq.addLast(promise)
promise
}
}
}
object AdaptiveLifoAsyncSemaphore {
private val MaxWaitersExceededException =
Future.exception(new RejectedExecutionException("Max waiters exceeded"))
}
object RequestSemaphoreFilter {
val role = Stack.Role("RequestConcurrencyLimit")
object param {
/**
* A class eligible for configuring a [[com.twitter.finagle.Stackable]]
* [[com.twitter.finagle.filter.RequestSemaphoreFilter]] module.
*/
case class Gate(sem: AsyncGate) {
def mk(): (Gate, Stack.Param[Gate]) = (this, Gate.param)
}
object Gate {
implicit val param = Stack.Param {
val threadPool = math.max((com.twitter.jvm.numProcs() * 2).ceil.toInt, 8)
Gate(
new AdaptiveLifoAsyncSemaphore(initialPermits = threadPool,
maxWaiters = threadPool / 2,
lifoThreshold = 0.7)
)
}
}
case class Control(control: DelayControl) {
def mk(): (Control, Stack.Param[Control]) = (this, Control.param)
}
object Control {
implicit val param = Stack.Param {
Control(
new CoDelDelayControl(
target = 5.milliseconds,
interval = 100.milliseconds
))
}
}
case class Timeout(timeout: Duration) {
def mk(): (Timeout, Stack.Param[Timeout]) = (this, Timeout.param)
}
object Timeout {
implicit val param = Stack.Param { Timeout(Duration.Top) }
}
}
/**
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.filter.RequestSemaphoreFilter]].
*/
def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new Stack.Module4[param.Gate, param.Control, param.Timeout, fparam.Stats, ServiceFactory[Req, Rep]] {
val role = RequestSemaphoreFilter.role
val description = "Restrict number of concurrent requests"
def make(
_gate: param.Gate,
_control: param.Control,
_timeout: param.Timeout,
_stats: fparam.Stats,
next: ServiceFactory[Req, Rep]
) = {
val param.Gate(sem) = _gate
val param.Control(control) = _control
val param.Timeout(timeout) = _timeout
val fparam.Stats(sr) = _stats
val filter = new RequestSemaphoreFilter[Req, Rep](sem, control, timeout) {
// We capture the gauges inside of here so their
// (reference) lifetime is tied to that of the filter
// itself.
val gauges = Seq(
sr.addGauge("request_concurrency") { sem.concurrency },
sr.addGauge("request_queue_size") { sem.waiters },
sr.addGauge("request_queue_overloaded") { if (control.overloaded) 1.0f else 0.0f },
sr.addGauge("request_queue_min_delay") { control.minDelay.inMilliseconds.toFloat }
)
}
filter andThen next
}
}
}
class RequestSemaphoreFilter[Req, Rep](
sem: AsyncGate,
control: DelayControl,
timeout: Duration
) extends SimpleFilter[Req, Rep] {
val timer = HashedWheelTimer.Default
def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = {
val timeIn = Time.now
sem.acquire().within(timer, timeout).transform {
case Return(permit) =>
val timeOut = Time.now
if (control.shouldDrop(timeOut - timeIn)) {
permit.release()
Future.exception(Failure.rejected("due to delay control"))
} else
service(req).ensure { permit.release() }
case Throw(noPermit) =>
Future.exception(Failure.rejected(noPermit))
}
}
}
object Main extends App {
val service: Service[Request, Response] = (req: Request) => {
implicit val timer = com.twitter.finagle.util.HashedWheelTimer.Default
val rep = Response()
rep.contentString = "ok"
Future
.sleep((Random.nextInt(14) + 5).seconds)
.before(Future.value(rep))
}
def main(): Unit = {
val pool = 16
val queue = 8
val timeout = 30.seconds
val server =
if (false)
Http.server.withAdmissionControl
.concurrencyLimit(pool, queue)
.withRequestTimeout(timeout)
else
Http.server
.withStack(Http.server.stack.replace(RequestSemaphoreFilter.role,
RequestSemaphoreFilter.module[Request, Response]))
.configured(RequestSemaphoreFilter.param.Gate(AdaptiveLifoAsyncSemaphore(pool, queue, .8)))
.configured(
RequestSemaphoreFilter.param.Control(CoDelDelayControl(5.milliseconds, 100.milliseconds)))
.configured(RequestSemaphoreFilter.param.Timeout(timeout))
Await.ready(
server.serve(":8080", service)
) // waits until the server resources are released
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment