Created
June 22, 2018 00:52
-
-
Save JohnMurray/34e3beb7f5eed4935f70dc45b0256067 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scratchspace | |
import java.time.LocalDateTime | |
import java.util.{Deque, LinkedList} | |
import scala.concurrent.{Future, Promise, TimeoutException} | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.util.{Failure, Success} | |
import java.util.concurrent.{ScheduledFuture, ScheduledThreadPoolExecutor, TimeUnit} | |
import java.time.temporal.ChronoUnit | |
/** A simple test object with some fake methods to make plalying with the rate-limiting code | |
* below easier. Simple enough that you can experiment within the REPL. | |
*/ | |
object MockAPI { | |
private val rand = new scala.util.Random() | |
class RateLimitException(msg: String) extends Throwable(msg, null) | |
def simpleService: Future[String] = Future { "1, 2, 3, 4, 5, 6" } | |
def throwingRateLimitedService: Future[String] = Future { | |
if (rand.nextInt(100) > 50) { | |
throw new RateLimitException("Rate limit exceeded") | |
} | |
"1, 2, 3, 4, 5, 6" | |
} | |
def nonThrowingRateLimitedService: Future[String] = Future { | |
if (rand.nextInt(100) > 50) { | |
"RATELIMIT_EXCEEDED" | |
} | |
else { | |
"1, 2, 3, 4, 5, 6" | |
} | |
} | |
def maybeThrowingRateLimitedService: Future[String] = Future { | |
rand.nextInt(100) match { | |
case x if x > 75 => throw new RateLimitException("Rate limit exceeded") | |
case x if x > 50 => "RATELIMIT_EXCEEDED" | |
case _ => "1, 2, 3, 4, 5, 6" | |
} | |
} | |
} | |
/** ApiService represents some generic, API operation that is executed with [[Future]]s. | |
* Requests submitted to the service are managed via an internal queue and accounting | |
* structures used to ensure that the number of requests made within a given time-window | |
* do not exceed a pre-defined rate-limit. | |
* | |
* @param limit The maximum number of operations to perform within a time-window | |
* @param timeFrame The duration of the time-window | |
* | |
* @tparam T The result-type of requests to the ApiService. | |
*/ | |
class ApiService[T](limit: Long, timeFrame: FiniteDuration) { | |
type LimitDetector = PartialFunction[Either[Throwable, T], Boolean] | |
/** Request (user provided) and meta-data regarding the request, bundled together | |
* for convenience and placed into the queue. This avoid us having to keep more | |
* accounting structures within ApiService. | |
* @param f User provided API call | |
* @param p [[Promise]] to [[Future]] provided to client on [[request]] | |
* @param limitDetector Partial function to detect rate-limit failures | |
* @param startDeadline The deadline for this to start executing | |
*/ | |
case class RequestQueueItem(f: () => Future[T], | |
p: Promise[T], | |
limitDetector: LimitDetector, | |
startDeadline: Option[LocalDateTime]) | |
private var requestQueue: Deque[RequestQueueItem] = new LinkedList[RequestQueueItem]() | |
private var requestCount: Long = 0 | |
private var windowStopTime: LocalDateTime = LocalDateTime.now() | |
private var cleanupTimeFrame: FiniteDuration = timeFrame | |
private var nextCleanupTime: LocalDateTime = LocalDateTime.now() | |
private var limitDetection: LimitDetector = { | |
case Left(_) => true | |
case Right(_) => false | |
} | |
private val timerPool = new ScheduledThreadPoolExecutor(1) | |
@volatile private var recheckFut: Option[ScheduledFuture[_]] = None | |
/** Override the default rate-limit-error detection code. By default it is assumed | |
* that any failed Future is a rate-limit exception and is likely not what you want. | |
*/ | |
def withDefaultLimitDetection(d: LimitDetector): Unit = { | |
limitDetection = d | |
} | |
/** Submit a function as part of the group of requests managed under this service's | |
* rate-limits. The request given will be queued up for execution within the first | |
* available time-window. That window is dependent upon the configured rate-limit | |
* quotas and the current size of the request queue. | |
* | |
* @param f User-defined request function to execute | |
* @param maxQueueTime The maximum time to wait before a | |
* @param d Override the default/configured rate-limit-error detection | |
* | |
* @return [[Future]] representing the eventual execution of [[f]] | |
*/ | |
def request(f: () => Future[T], maxQueueTime: Duration = Duration.Inf) | |
(d: LimitDetector = limitDetection) | |
: Future[T] = this.synchronized { | |
val p = Promise[T]() | |
val startDeadline = | |
if (maxQueueTime.isFinite) Some(LocalDateTime.now().plusSeconds(maxQueueTime.toSeconds)) | |
else None | |
requestQueue.offerLast(RequestQueueItem(f, p, d, startDeadline)) | |
tryRequest() | |
p.future | |
} | |
/** Check capacity and, if possible, execute a request from the head of the queue. */ | |
private def tryRequest(): Unit = this.synchronized { | |
cleanupRequests() | |
if (hasCapacity()) { | |
if (recheckFut.isDefined) { | |
recheckFut.map(_.cancel(true)) | |
recheckFut = None | |
} | |
makeRequest() | |
} else if (recheckFut.isEmpty) { | |
val timeRemaining = ChronoUnit.NANOS.between(LocalDateTime.now(), windowStopTime) | |
recheckFut = Some(timerPool.schedule( | |
(() => { | |
recheckFut = None | |
tryRequest() | |
}): Runnable, | |
timeRemaining, | |
TimeUnit.NANOSECONDS | |
)) | |
} | |
} | |
/** Modify the instance with a custom cleanup time. | |
* By default the [[cleanupTimeFrame]] is the same as the [[timeFrame]] | |
* | |
* @param window The duration at which the instance should clear pending requests | |
* which have exceeded their timeout | |
* @return The current instance (for chaining configuration methods) | |
*/ | |
def withCleanupWindow(window: FiniteDuration): ApiService[T] = { | |
cleanupTimeFrame = window | |
this | |
} | |
/** Scan the queue for all requests that have exceeded their time limit. Because time-limits | |
* are custom to individual requests, the entire queue must be scanned. | |
*/ | |
def cleanupRequests() { | |
val now = LocalDateTime.now() | |
if (now.isAfter(nextCleanupTime)) { | |
requestQueue.removeIf((req) => { | |
val expired = req.startDeadline.exists(now.isAfter) | |
if (expired) { | |
req.p.complete(Failure(new TimeoutException)) | |
} | |
expired | |
}) | |
nextCleanupTime = now.plusSeconds(cleanupTimeFrame.toSeconds) | |
} | |
} | |
/** Execute a request from the head of the queue. If the request fails, due to rate-limit | |
* errors, requeue at the head of the queue and call [[tryRequest]]. If the request is | |
* successful, yield the result back to the caller. | |
* | |
* If the queue is empty, do nothing. | |
*/ | |
private def makeRequest(): Unit = { | |
val now = LocalDateTime.now() | |
var req: RequestQueueItem = null | |
var foundValidRequest = false | |
do { | |
req = requestQueue.pollFirst() | |
if (req == null) return | |
foundValidRequest = req.startDeadline.forall(now.isBefore) | |
} while(!foundValidRequest) | |
requestCount += 1 | |
val result = req.f() | |
// re-queue at the head of the queue on failure and | |
// attempt to make another request | |
def retry(): Unit = this.synchronized { | |
requestQueue.offerFirst(req) | |
tryRequest() | |
} | |
result.onComplete { | |
case Failure(t: Throwable) => | |
if (req.limitDetector(Left(t))) { | |
retry() | |
} else { | |
req.p.complete(Failure(t)) | |
} | |
case Success(r) => | |
if (req.limitDetector(Right(r))) { | |
retry() | |
} else { | |
req.p.complete(Success(r)) | |
} | |
} | |
tryRequest() | |
} | |
/** Check if we have capacity within the current time-window. If we are in a new time-window, | |
* udate our internal accounting structures. | |
* | |
* @return True if a request can be made within the current window, false otherwise | |
*/ | |
private def hasCapacity(): Boolean = { | |
val now = LocalDateTime.now() | |
if (now.isAfter(windowStopTime)) { | |
windowStopTime = now.plusSeconds(timeFrame.toSeconds) | |
requestCount = 0 | |
true | |
} else { | |
requestCount < limit | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment