Create a gist now

Instantly share code, notes, and snippets.

@rodaine /fast.go
Last active Jul 28, 2017

What would you like to do?
Code snippets for my blog post "The X-Files: Controlling Throughput with rate.Limiter" (http://rodaine.com/2017/05/x-files-time-rate-golang/)
// RateLimit middleware limits the throughput to h using TickerLimiter
// configured with the provided rps and burst. The request will idle
// for the passed in wait before cancelling if there is a queue.
func RateLimit(rps, burst int, wait time.Duration, h http.HandlerFunc) http.HandlerFunc {
l, _ := TickerLimiter(rps, burst)
return func(w http.ResponseWriter, r *http.Request) {
t := time.NewTimer(wait)
select {
case <-l:
t.Stop()
case <-t.C: // wait deadline reached, cancel request
w.WriteHeader(http.StatusTooManyRequests)
return
}
h(w, r)
}
}
// HelloWorld is an http.HandlerFunc that calls an upstream service
// and prints "Hello, World!" to the response if successful.
func HelloWorld(w http.ResponseWriter, r *http.Request) {
switch err := upstream.Call(); err.(type) {
case nil: // no error
fmt.Fprintln(w, "Hello, World!")
case upstream.ErrTimeout: // known timeout error
w.WriteHeader(http.StatusGatewayTimeout)
default: // unknown error
w.WriteHeader(http.StatusBadGateway)
}
}
// RateLimit middleware limits the throughput to h using a rate.Limiter
// token bucket configured with the provided rps and burst. The request
// will idle for up to the passed in wait. If the limiter detects the
// deadline will be exceeded, the request is cancelled immediately.
func RateLimit(rps, burst int, wait time.Duration, h http.HandlerFunc) http.HandlerFunc {
l := rate.NewLimiter(rate.Limit(rps), burst)
return func(w http.ResponseWriter, r *http.Request) {
// create a new context from the request with the wait timeout
ctx, cancel := context.WithTimeout(r.Context(), wait)
defer cancel() // always cancel the context!
// Wait errors out if the request cannot be processed within
// the deadline. This is preemptive, instead of waiting the
// entire duration.
if err := l.Wait(ctx); err != nil {
w.WriteHeader(http.StatusTooManyRequests)
return
}
h(w, r)
}
}
const (
rps = 425 // the SLA maximum
burst = 10 // matches the upstream services concurrency
)
http.HandleFunc("/", RateLimit(rps, burst, HelloWorld))
// TickerLimiter returns a channel with a buffer capacity of burst
// that fills at the provided rps in hertz (1/s). If the limiter is no
// longer used, the returned cancel function must be called to release
// resources.
func TickerLimiter(rps, burst int) (c <-chan time.Time, cancel func()) {
// create the buffered channel and prefill it
c = make(chan time.Time, burst)
for i := 0; i < burst; i++ {
c <- time.Now()
}
// create a ticker with the interval 1/rps
t := time.NewTicker(time.Second / time.Duration(rps))
// add to the channel with each tick
go func() {
for t := range t.C {
select {
case c <- t: // add the tick to channel
default: // channel already full, drop the tick
}
}
close(c) // close channel when the ticker is stopped
}()
return c, t.Stop
}
// RateLimit middleware limits the throughput to h using TickerLimiter
// configured with the provided rps and burst.
func RateLimit(rps, burst int, h http.HandlerFunc) http.HandlerFunc {
l, _ := TickerLimiter(rps, burst)
return func(w http.ResponseWriter, r *http.Request) {
<-l // h is blocked by the TickerLimiter
h(w, r)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment