Skip to content

Instantly share code, notes, and snippets.

@marianogappa
Created December 4, 2016 04:53
Show Gist options
  • Save marianogappa/00b8235deffab51271ea4177369cfe2e to your computer and use it in GitHub Desktop.
Save marianogappa/00b8235deffab51271ea4177369cfe2e to your computer and use it in GitHub Desktop.
Example backpressure implementation in Go
/*
This snippet is an example of backpressure implementation in Go.
It doesn't run in Go Playground, because it starts an HTTP Server.
The example starts an HTTP server and sends multiple requests to it. The server starts denying
requests by replying an "X" (i.e. a 502) when its buffered channel reaches capacity.
This is not the same as rate-limiting; you might be interested in https://github.com/juju/ratelimit
or https://godoc.org/golang.org/x/time/rate.
Note that asking the question: `len(ch) < cap(ch) ?` is a racey operation; the channel might
actually be full a microsecond later. In the context of short running requests this is not an
issue.
Outputs:
```
$ go run test.go
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√X√X
√√X√√X√√X√√X√√X√√X√√X√√X√X√√√X√√X√X√√√X√X√√X√√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√√X√√X√√X
√X√√√X√X√√X√√X√√X√√X√X√√X√X√√√X√X√√X√√X√√X√√X√√X√X√√X√√X√√X√√X√X√√X√√X√X√√X√X√√X√√X√√X√√X√√X√
√X√√X√X√√X√√X√√X√√X√√X√X√√X√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√√
```
*/
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
)
func main() {
requests := make(chan request, 100)
go startServer(requests)
go process(requests)
makeRequests(500, 6*time.Millisecond)
}
func startServer(rq chan request) {
http.HandleFunc("/requests", handle(rq))
http.ListenAndServe(":9000", nil)
}
func process(rq chan request) {
for r := range rq {
r.process()
}
}
func makeRequests(count int, cooldown time.Duration) {
wg := sync.WaitGroup{}
for i := 0; i < count; i++ {
go func() {
wg.Add(1)
defer wg.Done()
response, _ := http.Get("http://localhost:9000/requests")
defer response.Body.Close()
b, _ := ioutil.ReadAll(response.Body)
fmt.Print(string(b))
}()
time.Sleep(cooldown)
}
wg.Wait()
}
func handle(rq chan request) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if len(rq) < cap(rq) {
r := newRequest(r)
rq <- r
w.Write(<-r.response)
} else {
w.Write([]byte("X"))
}
}
}
type request struct {
r *http.Request
response chan []byte
}
func newRequest(r *http.Request) request { return request{r, make(chan []byte)} }
func (r request) process() {
time.Sleep(10 * time.Millisecond)
r.response <- []byte("√")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment