Last active
January 18, 2017 22:35
-
-
Save sitano/3d6484127620be9afe0b4b2fd6179b3c to your computer and use it in GitHub Desktop.
Parallel fetcher with time limit constraints example, go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Ivan Prisyazhnyy <john.koepi@gmail.com>, 2017 | |
// | |
// Ambiguities: | |
// - "URL syntax correctness" is not explicitly defined. Should, I | |
// interpret it, like it has to match rfc1738/rfc3986? I will go | |
// with the simplest solution - URL is valid if parsed by util.URL. | |
// - Various behaviors for edge cases (i.e. meeting bad data, unstable | |
// network, too much data) are not well defined. If external service | |
// returns 4/5xx code, it could be retried and etc. | |
// - Answering always within 500 milliseconds - this is the biggest | |
// ambiguity here. In real life scenario this constraint will be | |
// set at the external system (observer). Thus a lot of various | |
// stages will be introduced in-between resulting performance | |
// of the request : network performance, transport protocols, | |
// system's queues, drivers, threads and IO scheduling, GC and | |
// memory management, external services performance, response data | |
// variance and etc. | |
// | |
// Rationale: | |
// I would take the simplest and literal here - 500 ms is the | |
// requirement for the server GET handler to write data back into | |
// the socket, data returned by the services is constant, sort | |
// costs O(1), all ops but IO waiting is O(1), GC costs O(1), | |
// threads and goroutines management costs O(1) and etc. | |
// | |
// Let for now performance does not depend on the system and | |
// service load. | |
// | |
// The nice idea I see here is having self tuning service, which | |
// is able to measure influence of the system average load and | |
// requests 99%% latency to the SLA, control processing rate, | |
// timings and etc depending on the system health and meeting SLA. | |
// | |
// Intention: | |
// Implement simple version which is able to process the request in | |
// 500 ms. It's far not the same as responding under 500 ms. | |
// | |
// Handler will: | |
// - Parse and validate request | |
// - Run fetcher in parallel | |
// - Wait for 500 ms - CONST time max | |
// - Preprocess results ASAP (in order of getting it) | |
// - Try to respond before 500 ms elapsed | |
// | |
// Tests: | |
// I will not write automated tests for now. | |
// | |
// I will test with wrk2. | |
// | |
// $ repeat 100 curl -v -w 'Connect: %{time_connect}\nStart transfer: %{time_starttransfer}\nTotal:%{time_total}\n' http://127.0.0.1:8000/numbers\?u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/fibo | |
// | |
// $ repeat 100 curl -v -w 'Connect: %{time_connect}\nStart transfer: %{time_starttransfer}\nTotal:%{time_total}\n' http://127.0.0.1:8000/numbers\?u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/fibo\&u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/odd\&u=http://127.0.0.1:8090/rand\&u=http://127.0.0.1:8090/rand | |
// | |
// as for github.com/giltene/wrk2 | |
// $ ./wrk -t2 -c800 -d30s -R800 -L -U --timeout 1s http://127.0.0.1:8000/numbers\?u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/fibo | |
// | |
// Quick start guide: | |
// $ go run 500ms-fetcher.go --debug | |
// | |
// To run CPU profiler: | |
// $ go build ./500ms-fetcher.go | |
// $ env GODEBUG=gctrace=1 ./500ms-fetcher -cpuprofile=test.prof | |
// $ go tool pprof ./test.prof | |
// | |
package main | |
import ( | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io/ioutil" | |
"log" | |
"net" | |
"net/http" | |
"net/url" | |
"os" | |
"os/signal" | |
"runtime" | |
"runtime/pprof" | |
"runtime/trace" | |
"sort" | |
"strconv" | |
"sync/atomic" | |
"syscall" | |
"time" | |
_ "net/http/pprof" | |
) | |
var bind = flag.String("bind", ":8000", "bind address (--bind 127.0.0.1:8000)") | |
var timeout = flag.Duration("timeout", 500*time.Millisecond, "request processing maximum time (--timeout 500ms)") | |
var rtc = flag.Duration("rtc", 5*time.Millisecond, "const approximation of data processing overhead (--rtc 10ms)") | |
var debug = flag.Bool("debug", false, "Enable verbose logging") | |
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`") | |
var memprofile = flag.String("memprofile", "", "write memory profile to `file`") | |
var traceprofile = flag.String("traceprofile", "", "write trace profile to `file`") | |
var reqCounter uint64 | |
type Message struct { | |
Numbers []int `json:"numbers"` | |
} | |
func main() { | |
flag.Parse() | |
if *cpuprofile != "" { | |
f, err := os.Create(*cpuprofile) | |
if err != nil { | |
log.Fatal(err) | |
} | |
pprof.StartCPUProfile(f) | |
defer pprof.StopCPUProfile() | |
} | |
if *traceprofile != "" { | |
f, err := os.Create(*traceprofile) | |
if err != nil { | |
log.Fatal(err) | |
} | |
trace.Start(f) | |
defer trace.Stop() | |
} | |
s := &http.Server{ | |
Addr: *bind, | |
Handler: nil, | |
} | |
http.HandleFunc("/numbers", numbersHandler) | |
// allow printing stack traces on SIGQUIT sig | |
listenSIGQUIT() | |
// allow soft http server shutdown on Interrupt sig | |
// to be able to write cpu prof smoothly | |
shutdown := makeShutdownCh() | |
if err := ListenAndServe(s, shutdown); err != nil { | |
// Detect if channel is closed | |
ok := true | |
select { | |
case _, ok = <-shutdown: | |
default: | |
} | |
// Means shutdown is in progress - skip error | |
if ok { | |
log.Println(err) | |
} | |
} | |
} | |
// GET /numbers?u=... handler | |
func numbersHandler(w http.ResponseWriter, r *http.Request) { | |
if r.Method != http.MethodGet { | |
http.NotFound(w, r) | |
return | |
} | |
start := time.Now() | |
msg := &Message{ | |
Numbers: []int{}, | |
} | |
// Parse | |
u := r.URL.Query()["u"] | |
reqs := make([]string, 0, len(u)) | |
for _, raw := range u { | |
if _, err := url.ParseRequestURI(raw); err != nil { | |
log.Println("Error parsing", raw, ":", err) | |
} else { | |
reqs = append(reqs, raw) | |
} | |
} | |
w.Header().Set("Content-Type", "application/json") | |
// Empty | |
if len(reqs) == 0 { | |
writeJSON(msg, w) | |
return | |
} | |
// Execute | |
id := uint64(0) | |
if *debug { | |
id = atomic.AddUint64(&reqCounter, 1) | |
} | |
pass := time.Now().Sub(start) + *rtc | |
if pass >= *timeout { | |
// Already behind the limit. returning { "numbers": [] } is fine | |
// but want to see 500 in load tests | |
log.Println("Parsing request run out of time") | |
http.Error(w, "Parsing request run out of time", http.StatusInternalServerError) | |
// This is nice place to snapshot memory under heavy load | |
if *memprofile != "" { | |
f, err := os.Create(*memprofile) | |
if err != nil { | |
log.Fatal("could not create memory profile: ", err) | |
} | |
runtime.GC() // get up-to-date statistics | |
if err := pprof.WriteHeapProfile(f); err != nil { | |
log.Fatal("could not write memory profile: ", err) | |
} | |
f.Close() | |
} | |
return | |
} | |
waitDuration := *timeout - pass | |
wait := time.After(waitDuration) | |
res := getServicesResponse(id, reqs, waitDuration) | |
// Wait | |
waitLoop: | |
for { | |
select { | |
// it tries hard to return results asap | |
case resp, ok := <-res: | |
if ok { | |
// next result | |
msg.Numbers = resp | |
} else { | |
// finished | |
break waitLoop | |
} | |
case <-wait: | |
// finished | |
break waitLoop | |
} | |
} | |
// Response | |
writeJSON(msg, w) | |
if *debug { | |
log.Println(id, ":", r.URL.String(), "time=", time.Now().Sub(start)) | |
} | |
} | |
// getServicesResponse sends requests in parallel and tries to preprocess | |
// response as soon as possible in time. | |
// | |
// I use single <collect> channel here instead of per-worker-result channel because | |
// I don't want to do multisig waits via reflect. But rational here is under | |
// the question. After all, I don't want to allocate more channels and I assume | |
// most of the services will respond in time. | |
func getServicesResponse(id uint64, reqs []string, timeout time.Duration) <-chan []int { | |
wait := time.After(timeout) | |
resp := make(chan []int, len(reqs)) | |
agg := make(chan []int) | |
for _, req := range reqs { | |
go worker(id, req, timeout, agg) | |
} | |
go func() { | |
numbers := []int{} | |
finished := false | |
done := 0 | |
for { | |
select { | |
case res := <-agg: | |
if !finished { | |
// Aggregation | |
// I am super lazy here not removing dups, | |
// but instead, I will use smart serializer | |
numbers = append(numbers, res...) | |
// Sort is not effective here, but ok for the demo purposes, right? | |
sort.Ints(numbers) | |
resp <- numbers | |
} | |
// if all workers finished(), close channels and exit | |
done++ | |
if done == len(reqs) { | |
close(agg) | |
if !finished { | |
close(resp) | |
} | |
if *debug { | |
log.Println("[DEBUG]", id, ":", "finished processing", finished) | |
} | |
return | |
} | |
case <-wait: | |
finished = true | |
// let caller to proceed | |
close(resp) | |
// but collect must wait workers tail to finish | |
} | |
} | |
}() | |
return resp | |
} | |
// worker makes request to the service and returns numbers | |
func worker(id uint64, req string, timeout time.Duration, res chan<- []int) { | |
if *debug { | |
log.Println("[DEBUG]", id, ":", "start worker", req) | |
} | |
var body []byte | |
start := time.Now() | |
msg := &Message{ | |
Numbers: []int{}, | |
} | |
defer func() { | |
if msg.Numbers == nil { | |
msg.Numbers = []int{} | |
} | |
res <- msg.Numbers | |
if *debug { | |
log.Println("[DEBUG]", id, ":", "finished worker", req, "body", string(body), "time=", time.Now().Sub(start)) | |
} | |
}() | |
// Get | |
client := &http.Client{ | |
Timeout: timeout, | |
} | |
resp, err := client.Get(req) | |
if err != nil { | |
if *debug { | |
log.Println("[ERROR]", id, ":", "error fetching", req, ":", err) | |
} | |
return | |
} | |
// Read | |
body, err = ioutil.ReadAll(resp.Body) | |
resp.Body.Close() | |
if err != nil { | |
if *debug { | |
log.Println("[ERROR]", id, ":", "error reading", req, "body:", err) | |
} | |
return | |
} | |
if err = json.Unmarshal(body, msg); err != nil { | |
if *debug { | |
log.Println("[ERROR]", id, ":", "error unmarshalling", req, "body", string(body), ":", err) | |
} | |
return | |
} | |
} | |
// writeJSON serializes Message and skips duplicates in sorted array | |
func writeJSON(m *Message, w http.ResponseWriter) { | |
w.WriteHeader(http.StatusOK) | |
// sorry, I would skip errors checking here | |
// have no idea for now is it buffered | |
w.Write([]byte("{\"numbers\":[")) | |
if len(m.Numbers) > 0 { | |
last := m.Numbers[0] | |
w.Write([]byte(strconv.Itoa(last))) | |
for _, num := range m.Numbers { | |
if num != last { | |
w.Write([]byte(",")) | |
w.Write([]byte(strconv.Itoa(num))) | |
last = num | |
} | |
} | |
} | |
w.Write([]byte("]}\n")) | |
} | |
// ListenAndServe listens on the TCP network address and | |
// calls Serve to handle requests on incoming connections. If | |
// srv.Addr is blank, ":http" is used. | |
func ListenAndServe(srv *http.Server, shutdown <-chan struct{}) error { | |
addr := srv.Addr | |
if addr == "" { | |
addr = ":http" | |
} | |
ln, err := net.Listen("tcp", addr) | |
if err != nil { | |
return err | |
} | |
log.Println("Server bind to", addr) | |
go handleInterrupt(shutdown, srv, ln) | |
return srv.Serve(TCPKeepAliveListener{ln.(*net.TCPListener)}) | |
} | |
// https://github.com/tylerb/graceful | |
func handleInterrupt(interrupt <-chan struct{}, srv *http.Server, listener net.Listener) { | |
<-interrupt | |
srv.SetKeepAlivesEnabled(false) | |
_ = listener.Close() | |
} | |
// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted | |
// connections. It's used by ListenAndServe and ListenAndServeTLS so | |
// dead TCP connections (e.g. closing laptop mid-download) eventually | |
// go away. | |
type TCPKeepAliveListener struct { | |
*net.TCPListener | |
} | |
func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) { | |
tc, err := ln.AcceptTCP() | |
if err != nil { | |
return | |
} | |
tc.SetKeepAlive(true) | |
tc.SetKeepAlivePeriod(3 * time.Minute) | |
return tc, nil | |
} | |
// makeShutdownCh returns a channel that can be used for shutdown | |
// notifications for commands. This channel will send a message for every | |
// interrupt or SIGTERM received. | |
func makeShutdownCh() <-chan struct{} { | |
resultCh := make(chan struct{}, 1) | |
signalCh := make(chan os.Signal, 4) | |
signal.Notify(signalCh, os.Kill, os.Interrupt, syscall.SIGTERM) | |
go func() { | |
select { | |
case <-signalCh: | |
} | |
signal.Stop(signalCh) | |
log.Println("Got interrupt signal...") | |
resultCh <- struct{}{} | |
close(resultCh) | |
}() | |
return resultCh | |
} | |
func listenSIGQUIT() { | |
signalCh := make(chan os.Signal) | |
signal.Notify(signalCh, syscall.SIGQUIT) | |
go func() { | |
stacktrace := make([]byte, 5 * 1024 * 1024) | |
for { | |
select { | |
case <-signalCh: | |
length := runtime.Stack(stacktrace, true) | |
fmt.Println(string(stacktrace[:length])) | |
} | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment