Skip to content

Instantly share code, notes, and snippets.

@DeLaGuardo
Last active June 16, 2021 11:41
Show Gist options
  • Save DeLaGuardo/808353f70475d96cdd4a484e65da0a0e to your computer and use it in GitHub Desktop.
Save DeLaGuardo/808353f70475d96cdd4a484e65da0a0e to your computer and use it in GitHub Desktop.
(ns user
(:require [clojure.core.async :as async]))
(defn now-millis []
(System/currentTimeMillis))
(defn throttled-chan
"Returns a channel that will throttle attempts to take items according to the rate-limits.
rate-limits should be a sequence of pairs of integers. The first item in each pair
is describing how many items is possible to take, at most, from the input for the interval
defined as second item in pair in milliseconds.
Output channel will close if either input channel is drained/closed or rate-limits are empty.
The output channle is unbuffered by default, unless buf-or-n is given."
([rate-limits ch] (throttled-chan rate-limits ch nil))
([rate-limits ch buf-or-n]
(let [out-ch (async/chan buf-or-n)]
(async/go
(loop [now (now-millis)
[[rate interval] & rate-limits :as rls] rate-limits
timeout interval]
(if (seq rls)
(let [t-ch (async/timeout timeout)
ctrl-ch (async/go
(loop [x 0]
(if (< x rate)
(let [[value port] (async/alts! [t-ch ch])]
(condp = port
t-ch ::timeout
ch (when (some? value)
(async/>! out-ch value)
(recur (inc x)))))
(do (async/<! t-ch)
::timeout))))
ctrl (async/<! ctrl-ch)
now' (now-millis)]
(if (= ::timeout ctrl)
(recur now' rate-limits (- (second (first rate-limits)) (- now' (+ now timeout))))
(async/close! out-ch)))
(async/close! out-ch))))
out-ch)))
(comment
(def current-rate-limit
"one request per second"
(volatile! [1 1000]))
(defn rate-limits []
(lazy-seq
(cons @current-rate-limit (rate-limits))))
(let [in-ch (async/chan)
out-ch (throttled-chan (rate-limits) in-ch)]
(async/go-loop [counter 0]
(when-let [v (async/<! out-ch)]
(prn (format "%02d %s" v (now-millis)))
(when (= counter 10)
;; Increase rate-limit - now it should be 10 rps
(vreset! current-rate-limit [10 1000]))
(recur (inc counter))))
(async/go-loop [[item & items] (range 50)]
(if (some? item)
(do (async/>! in-ch item)
(recur items))
(async/close! in-ch))))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment