Skip to content

Instantly share code, notes, and snippets.

@JohnMurray
Created June 22, 2018 00:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JohnMurray/34e3beb7f5eed4935f70dc45b0256067 to your computer and use it in GitHub Desktop.
Save JohnMurray/34e3beb7f5eed4935f70dc45b0256067 to your computer and use it in GitHub Desktop.
package scratchspace
import java.time.LocalDateTime
import java.util.{Deque, LinkedList}
import scala.concurrent.{Future, Promise, TimeoutException}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit}
import java.time.temporal.ChronoUnit
/** A simple test object with some fake methods to make plalying with the rate-limiting code
* below easier. Simple enough that you can experiment within the REPL.
*/
object MockAPI {
private val rand = new scala.util.Random()
class RateLimitException(msg: String) extends Throwable(msg, null)
def simpleService: Future[String] = Future { "1, 2, 3, 4, 5, 6" }
def throwingRateLimitedService: Future[String] = Future {
if (rand.nextInt(100) > 50) {
throw new RateLimitException("Rate limit exceeded")
}
"1, 2, 3, 4, 5, 6"
}
def nonThrowingRateLimitedService: Future[String] = Future {
if (rand.nextInt(100) > 50) {
"RATELIMIT_EXCEEDED"
}
else {
"1, 2, 3, 4, 5, 6"
}
}
def maybeThrowingRateLimitedService: Future[String] = Future {
rand.nextInt(100) match {
case x if x > 75 => throw new RateLimitException("Rate limit exceeded")
case x if x > 50 => "RATELIMIT_EXCEEDED"
case _ => "1, 2, 3, 4, 5, 6"
}
}
}
/** ApiService represents some generic, API operation that is executed with [[Future]]s.
* Requests submitted to the service are managed via an internal queue and accounting
* structures used to ensure that the number of requests made within a given time-window
* do not exceed a pre-defined rate-limit.
*
* @param limit The maximum number of operations to perform within a time-window
* @param timeFrame The duration of the time-window
*
* @tparam T The result-type of requests to the ApiService.
*/
class ApiService[T](limit: Long, timeFrame: FiniteDuration) {
type LimitDetector = PartialFunction[Either[Throwable, T], Boolean]
/** Request (user provided) and meta-data regarding the request, bundled together
* for convenience and placed into the queue. This avoid us having to keep more
* accounting structures within ApiService.
* @param f User provided API call
* @param p [[Promise]] to [[Future]] provided to client on [[request]]
* @param limitDetector Partial function to detect rate-limit failures
* @param startDeadline The deadline for this to start executing
*/
case class RequestQueueItem(f: () => Future[T],
p: Promise[T],
limitDetector: LimitDetector,
startDeadline: Option[LocalDateTime])
private var requestQueue: Deque[RequestQueueItem] = new LinkedList[RequestQueueItem]()
private var requestCount: Long = 0
private var windowStopTime: LocalDateTime = LocalDateTime.now()
private var cleanupTimeFrame: FiniteDuration = timeFrame
private var nextCleanupTime: LocalDateTime = LocalDateTime.now()
private var limitDetection: LimitDetector = {
case Left(_) => true
case Right(_) => false
}
private val timerPool = new ScheduledThreadPoolExecutor(1)
@volatile private var recheckFut: Option[ScheduledFuture[_]] = None
/** Override the default rate-limit-error detection code. By default it is assumed
* that any failed Future is a rate-limit exception and is likely not what you want.
*/
def withDefaultLimitDetection(d: LimitDetector): Unit = {
limitDetection = d
}
/** Submit a function as part of the group of requests managed under this service's
* rate-limits. The request given will be queued up for execution within the first
* available time-window. That window is dependent upon the configured rate-limit
* quotas and the current size of the request queue.
*
* @param f User-defined request function to execute
* @param maxQueueTime The maximum time to wait before a
* @param d Override the default/configured rate-limit-error detection
*
* @return [[Future]] representing the eventual execution of [[f]]
*/
def request(f: () => Future[T], maxQueueTime: Duration = Duration.Inf)
(d: LimitDetector = limitDetection)
: Future[T] = this.synchronized {
val p = Promise[T]()
val startDeadline =
if (maxQueueTime.isFinite) Some(LocalDateTime.now().plusSeconds(maxQueueTime.toSeconds))
else None
requestQueue.offerLast(RequestQueueItem(f, p, d, startDeadline))
tryRequest()
p.future
}
/** Check capacity and, if possible, execute a request from the head of the queue. */
private def tryRequest(): Unit = this.synchronized {
cleanupRequests()
if (hasCapacity()) {
if (recheckFut.isDefined) {
recheckFut.map(_.cancel(true))
recheckFut = None
}
makeRequest()
} else if (recheckFut.isEmpty) {
val timeRemaining = ChronoUnit.NANOS.between(LocalDateTime.now(), windowStopTime)
recheckFut = Some(timerPool.schedule(
(() => {
recheckFut = None
tryRequest()
}): Runnable,
timeRemaining,
TimeUnit.NANOSECONDS
))
}
}
/** Modify the instance with a custom cleanup time.
* By default the [[cleanupTimeFrame]] is the same as the [[timeFrame]]
*
* @param window The duration at which the instance should clear pending requests
* which have exceeded their timeout
* @return The current instance (for chaining configuration methods)
*/
def withCleanupWindow(window: FiniteDuration): ApiService[T] = {
cleanupTimeFrame = window
this
}
/** Scan the queue for all requests that have exceeded their time limit. Because time-limits
* are custom to individual requests, the entire queue must be scanned.
*/
def cleanupRequests() {
val now = LocalDateTime.now()
if (now.isAfter(nextCleanupTime)) {
requestQueue.removeIf((req) => {
val expired = req.startDeadline.exists(now.isAfter)
if (expired) {
req.p.complete(Failure(new TimeoutException))
}
expired
})
nextCleanupTime = now.plusSeconds(cleanupTimeFrame.toSeconds)
}
}
/** Execute a request from the head of the queue. If the request fails, due to rate-limit
* errors, requeue at the head of the queue and call [[tryRequest]]. If the request is
* successful, yield the result back to the caller.
*
* If the queue is empty, do nothing.
*/
private def makeRequest(): Unit = {
val now = LocalDateTime.now()
var req: RequestQueueItem = null
var foundValidRequest = false
do {
req = requestQueue.pollFirst()
if (req == null) return
foundValidRequest = req.startDeadline.forall(now.isBefore)
} while(!foundValidRequest)
requestCount += 1
val result = req.f()
// re-queue at the head of the queue on failure and
// attempt to make another request
def retry(): Unit = this.synchronized {
requestQueue.offerFirst(req)
tryRequest()
}
result.onComplete {
case Failure(t: Throwable) =>
if (req.limitDetector(Left(t))) {
retry()
} else {
req.p.complete(Failure(t))
}
case Success(r) =>
if (req.limitDetector(Right(r))) {
retry()
} else {
req.p.complete(Success(r))
}
}
tryRequest()
}
/** Check if we have capacity within the current time-window. If we are in a new time-window,
* udate our internal accounting structures.
*
* @return True if a request can be made within the current window, false otherwise
*/
private def hasCapacity(): Boolean = {
val now = LocalDateTime.now()
if (now.isAfter(windowStopTime)) {
windowStopTime = now.plusSeconds(timeFrame.toSeconds)
requestCount = 0
true
} else {
requestCount < limit
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment