Skip to content

Instantly share code, notes, and snippets.

@woodsaj
Created September 14, 2017 10:00
Show Gist options
  • Save woodsaj/ab06ed4d7f894587ba33fbbca1875299 to your computer and use it in GitHub Desktop.
Save woodsaj/ab06ed4d7f894587ba33fbbca1875299 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
"github.com/gorilla/handlers"
)
var (
tr = &http.Transport{}
client = &http.Client{Transport: tr}
)
func main() {
r := http.NewServeMux()
r.HandleFunc("/getdata", getdata)
r.HandleFunc("/render", render)
log.Printf("listening on :1111")
http.ListenAndServe(":1111", handlers.CombinedLoggingHandler(os.Stdout, r))
}
// expect a "id" param in the request. id is an int and we should
// wait this many seconds before returning the id back.
func getdata(w http.ResponseWriter, req *http.Request) {
id := req.FormValue("id")
wait, err := strconv.ParseInt(id, 10, 64)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
log.Printf("backend: processing request with id: %s", id)
if wait == 2 {
writeError(w, http.StatusInternalServerError, "fake error")
return
}
ctx := req.Context()
// wait our specified duration or until the request context is cancled.
// this will happen when the client closes the connection before we send our response
select {
case <-time.After(time.Second * time.Duration(wait)):
w.Write([]byte(id))
return
case <-ctx.Done():
log.Printf("backend: request was canceled.")
writeError(w, 499, "request canceled")
return
}
}
func render(w http.ResponseWriter, req *http.Request) {
// generate a new context with a cancelFunc that we can call
reqContext, cancel := context.WithCancel(req.Context())
resp := make([]byte, 0)
var wg sync.WaitGroup
// channel for our results from remote requests
c := make(chan struct {
data []byte
err error
}, 1)
// make 4 concurrent remote requests
for i := 1; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
// call our remote servers. We pass our context and cancelFunc
// if getRemote encounters an error it will call cancel()
// all other parallel getRemote() calls will then see that
// the context has been canceled and can abort.
data, err := getRemote(reqContext, cancel, i)
pack := struct {
data []byte
err error
}{[]byte(fmt.Sprintf("%d: %s\n", i, data)), err}
// push our response onto the channel
c <- pack
}(i)
}
// wait for all getRemote() calls to complete, then close our results channel
go func() {
wg.Wait()
close(c)
}()
// iterate over our results
for r := range c {
if r.err != nil {
log.Printf("frontend: work returned error. %s", r.err.Error())
writeError(w, http.StatusInternalServerError, r.err.Error())
return
}
resp = append(resp, r.data...)
}
w.Write(resp)
}
func getRemote(ctx context.Context, cancel context.CancelFunc, i int) ([]byte, error) {
req, _ := http.NewRequest("GET", fmt.Sprintf("http://localhost:1111/getdata?id=%d", i), nil)
// anonymous struct to pack and unpack data in the channel
c := make(chan struct {
r *http.Response
err error
}, 1)
// make our http request in a separte goroutine
go func() {
log.Printf("getRemote: Doing http request with id=%d", i)
resp, err := client.Do(req)
pack := struct {
r *http.Response
err error
}{resp, err}
c <- pack
}()
// wait for either our results from the http request or if out context has been canceled
// then abort the http request.
select {
case <-ctx.Done():
log.Printf("getRemote: peer request was cancled. termintion job with id=%d", i)
tr.CancelRequest(req)
<-c // Wait for client.Do
return nil, nil
case ok := <-c:
err := ok.err
resp := ok.r
if err != nil {
log.Printf("getRemote: request id=%d failed. canceling peer requets.", i)
cancel()
return nil, err
}
defer resp.Body.Close()
out, _ := ioutil.ReadAll(resp.Body)
if resp.StatusCode >= 300 {
log.Printf("getRemote: job id=%d got non 200 response. %s", i, out)
return out, fmt.Errorf(resp.Status)
}
fmt.Printf("getRemote: job with id=%d got Response: %s\n", i, out)
return out, err
}
}
func writeError(w http.ResponseWriter, code int, err string) {
w.WriteHeader(code)
w.Write([]byte(err))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment