Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?

rate limiting

The reality of the distributed, service-oriented world we live in is that we have to contend with the consequences of not only our requests, but the requests of all of the other clients of that service. For instance, many services have maximum request rates to protect the availability of a service from overambitious clients. An API might say "make no more than 10 requests per second".

How do you enforce that? My server has 24 cores, and I run as many threads. How do those threads coordinate so they don't go over the limit? It gets more complicated when you scale to multiple machines. Let's keep it simple and just talk about the shared-memory case.

There's a nice algorithm for implementing rate limiting called token bucket. If you can't make more than 10 requests per second, you make a rule: no one can make a request unless they have a token. Now dole out the tokens at no more than 10 requests per second. It's a great way to centralize the control of the rate.

Your mission, should you choose to accept it, is to implement token bucket. Make sure it works well with multiple threads. It's surprisingly easy in Clojure with a thread and an atom, but there are other ways to do it. Core.async makes it a cinch.

Extra credit for then making a higher-order function that rate limits a function.

(ns token-bucket.main)
(defn token-bucket
"Returns a function that takes no arguments and returns whether the bucket
has a token for you. Tokens are available at a rate of `per-second` tokens
per second."
[per-second]
(let [bucket (atom {:tokens per-second
; :v for version, which is just a mechanism for saying
; whether we're returning a new token without having to
; store the contents of the bucket.
:v 0
:ts (System/currentTimeMillis)})
ms-per-token (quot 1000 per-second)]
(fn []
(let [[old new] (swap-vals!
bucket
(fn [{:keys [tokens v ts]}]
(let [now (System/currentTimeMillis)
elapsed (- now ts)
[new-tokens ms-since-last-token] ((juxt quot rem) elapsed ms-per-token)]
{:tokens (max 0 (dec (min per-second (+ tokens new-tokens))))
; If there was already a token available, or a new
; token has been made available, bump the "version".
:v (if (or (pos? tokens) (pos? new-tokens)) (inc v) v)
:ts (- now ms-since-last-token)})))]
(< (:v old) (:v new))))))
(defn limit
"Returns a new function that wraps `f`, restricting its calling more than
`per-second` times per second. Simply returns nil when it's been rate
limited."
[per-second f]
(let [get-token (token-bucket per-second)]
(fn [& args] (when (get-token) (apply f args)))))
(comment
(def copacetic? (constantly true))
(def limited-copacetic? (limit 10 copacetic?))
(let [copes (atom 0)]
(dotimes [_ 100]
(.start (Thread. #(when (limited-copacetic?) (swap! copes inc))))
(Thread/sleep 10))
(println @copes)))
(require [clojure.core.async :refer :all])
(defn limit-rate [r b f]
(let [bucket (chan (dropping-buffer b))] ;; burstiness
(go
(while true
(>! bucket :token)
(<! (timeout (int (/ 1000 r)))))) ;; rate
(fn [& args]
(<!! bucket) ;; block for a token
(apply f args))))
(def token-bucket (chan (dropping-buffer 10)))
;; put tokens in the bucket at the rate of 10 per second
(go-loop []
(<! (timeout 100)) ; 100ms = 10 per second
(>! token-bucket :t)
(recur))
;; 2 clients with aggregate request rate below limit - not throttled
;; result: 5 tokens granted per second, half the max allowed.
(do
(go-loop [] ; client 1
(<! (timeout 333)) ; request 3 per second
(<! token-bucket)
(recur))
(go-loop [] ; client 2
(<! (timeout 500)) ; request 2 per second
(<! token-bucket)
(recur)))
;; 2 clients with aggregate request rate above limit - throttled to 10/per second
;; result: 10 tokens granted per second, max allowed.
(do
(go-loop [] ; client 1
(<! (timeout 125)) ; request 8 per second
(<! token-bucket)
(recur))
(go-loop [] ; client 2
(<! (timeout 100)) ; request 10 per second
(<! token-bucket)
(recur)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment