Skip to content

Instantly share code, notes, and snippets.

@hcoa
Last active May 10, 2021 13:51
Show Gist options
  • Save hcoa/a3d1ba963aa2fe6595f5ff859c979556 to your computer and use it in GitHub Desktop.
Save hcoa/a3d1ba963aa2fe6595f5ff859c979556 to your computer and use it in GitHub Desktop.
simple, thread unsafe rate-limiter, and simple token bucket rate-limiter
import java.time.Clock
import scala.annotation.tailrec
import scala.collection.mutable
case class Issue(tokens: Long, timestampMillis: Long)
class NaiveRateLimiter private (capacity: Long, periodMs: Long, clock: Clock) {
private var availableCapacity = capacity
private val issuedTokens = mutable.MutableList[Issue]()
def tryConsume(tokens: Long): Boolean = this.synchronized {
val nowMs = clock.millis()
removeObsolete(nowMs)
if (availableCapacity < tokens) false
else {
issuedTokens += Issue(tokens, nowMs)
availableCapacity -= tokens
true
}
}
private def removeObsolete(now: Long): Unit = {
@tailrec
def removeRec(allTokens: mutable.MutableList[Issue]): Unit = {
allTokens match {
case h +: rest if now - h.timestampMillis > periodMs =>
availableCapacity += h.tokens
removeRec(rest)
case _ => ()
}
}
removeRec(issuedTokens)
}
}
object NaiveRateLimiter {
val clock = Clock.systemDefaultZone()
val naiveRateLimiter = new NaiveRateLimiter(100, 1000, clock)
naiveRateLimiter.tryConsume(10)
}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
// not thread-safe
class SimpleRateLimiter(capacity: Int, period: FiniteDuration) {
private var idx = 0
private val requests =
Array.fill(capacity)(Deadline.now - period) // array of capacity elements
private def lastTime = requests(idx)
private def update(deadline: Deadline): Unit = {
requests(idx) = deadline
idx += 1
if (idx == capacity) idx = 0
}
def invoke[A](f: => Future[A]): Future[A] = {
val now = Deadline.now
if (now - lastTime < period) Future.failed(new Exception("429"))
else {
update(now)
f
}
}
}
object RL {
def main(argv: Array[String]): Unit = {
val rt = new SimpleRateLimiter(10, 1.second)
val start = System.currentTimeMillis()
val updates = new AtomicInteger(0)
while (System.currentTimeMillis() - start < 2000) {
rt.invoke(Future {
updates.getAndAdd(1)
})
}
println(s"updates $updates in 2 seconds")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment