Code snippets for my blog post "The X-Files: Controlling Throughput with rate.Limiter" (http://rodaine.com/2017/05/x-files-time-rate-golang/)
// RateLimit middleware limits the throughput to h using TickerLimiter | |
// configured with the provided rps and burst. The request will idle | |
// for the passed in wait before cancelling if there is a queue. | |
func RateLimit(rps, burst int, wait time.Duration, h http.HandlerFunc) http.HandlerFunc { | |
l, _ := TickerLimiter(rps, burst) | |
return func(w http.ResponseWriter, r *http.Request) { | |
t := time.NewTimer(wait) | |
select { | |
case <-l: | |
t.Stop() | |
case <-t.C: // wait deadline reached, cancel request | |
w.WriteHeader(http.StatusTooManyRequests) | |
return | |
} | |
h(w, r) | |
} | |
} |
// HelloWorld is an http.HandlerFunc that calls an upstream service | |
// and prints "Hello, World!" to the response if successful. | |
func HelloWorld(w http.ResponseWriter, r *http.Request) { | |
switch err := upstream.Call(); err.(type) { | |
case nil: // no error | |
fmt.Fprintln(w, "Hello, World!") | |
case upstream.ErrTimeout: // known timeout error | |
w.WriteHeader(http.StatusGatewayTimeout) | |
default: // unknown error | |
w.WriteHeader(http.StatusBadGateway) | |
} | |
} |
// RateLimit middleware limits the throughput to h using a rate.Limiter | |
// token bucket configured with the provided rps and burst. The request | |
// will idle for up to the passed in wait. If the limiter detects the | |
// deadline will be exceeded, the request is cancelled immediately. | |
func RateLimit(rps, burst int, wait time.Duration, h http.HandlerFunc) http.HandlerFunc { | |
l := rate.NewLimiter(rate.Limit(rps), burst) | |
return func(w http.ResponseWriter, r *http.Request) { | |
// create a new context from the request with the wait timeout | |
ctx, cancel := context.WithTimeout(r.Context(), wait) | |
defer cancel() // always cancel the context! | |
// Wait errors out if the request cannot be processed within | |
// the deadline. This is preemptive, instead of waiting the | |
// entire duration. | |
if err := l.Wait(ctx); err != nil { | |
w.WriteHeader(http.StatusTooManyRequests) | |
return | |
} | |
h(w, r) | |
} | |
} |
const ( | |
rps = 425 // the SLA maximum | |
burst = 10 // matches the upstream services concurrency | |
) | |
http.HandleFunc("/", RateLimit(rps, burst, HelloWorld)) |
// TickerLimiter returns a channel with a buffer capacity of burst | |
// that fills at the provided rps in hertz (1/s). If the limiter is no | |
// longer used, the returned cancel function must be called to release | |
// resources. | |
func TickerLimiter(rps, burst int) (c <-chan time.Time, cancel func()) { | |
// create the buffered channel and prefill it | |
c = make(chan time.Time, burst) | |
for i := 0; i < burst; i++ { | |
c <- time.Now() | |
} | |
// create a ticker with the interval 1/rps | |
t := time.NewTicker(time.Second / time.Duration(rps)) | |
// add to the channel with each tick | |
go func() { | |
for t := range t.C { | |
select { | |
case c <- t: // add the tick to channel | |
default: // channel already full, drop the tick | |
} | |
} | |
close(c) // close channel when the ticker is stopped | |
}() | |
return c, t.Stop | |
} | |
// RateLimit middleware limits the throughput to h using TickerLimiter | |
// configured with the provided rps and burst. | |
func RateLimit(rps, burst int, h http.HandlerFunc) http.HandlerFunc { | |
l, _ := TickerLimiter(rps, burst) | |
return func(w http.ResponseWriter, r *http.Request) { | |
<-l // h is blocked by the TickerLimiter | |
h(w, r) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment