Skip to content

Instantly share code, notes, and snippets.

@sitano
Last active January 18, 2017 22:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sitano/3d6484127620be9afe0b4b2fd6179b3c to your computer and use it in GitHub Desktop.
Save sitano/3d6484127620be9afe0b4b2fd6179b3c to your computer and use it in GitHub Desktop.
Parallel fetcher with time limit constraints example, go
// Ivan Prisyazhnyy <john.koepi@gmail.com>, 2017
//
// Ambiguities:
// - "URL syntax correctness" is not explicitly defined. Should, I
// interpret it, like it has to match rfc1738/rfc3986? I will go
// with the simplest solution - URL is valid if parsed by util.URL.
// - Various behaviors for edge cases (i.e. meeting bad data, unstable
// network, too much data) are not well defined. If external service
// returns 4/5xx code, it could be retried and etc.
// - Answering always within 500 milliseconds - this is the biggest
// ambiguity here. In real life scenario this constraint will be
// set at the external system (observer). Thus a lot of various
// stages will be introduced in-between resulting performance
// of the request : network performance, transport protocols,
// system's queues, drivers, threads and IO scheduling, GC and
// memory management, external services performance, response data
// variance and etc.
//
// Rationale:
// I would take the simplest and literal here - 500 ms is the
// requirement for the server GET handler to write data back into
// the socket, data returned by the services is constant, sort
// costs O(1), all ops but IO waiting is O(1), GC costs O(1),
// threads and goroutines management costs O(1) and etc.
//
// Let for now performance does not depend on the system and
// service load.
//
// The nice idea I see here is having self tuning service, which
// is able to measure influence of the system average load and
// requests 99%% latency to the SLA, control processing rate,
// timings and etc depending on the system health and meeting SLA.
//
// Intention:
// Implement simple version which is able to process the request in
// 500 ms. It's far not the same as responding under 500 ms.
//
// Handler will:
// - Parse and validate request
// - Run fetcher in parallel
// - Wait for 500 ms - CONST time max
// - Preprocess results ASAP (in order of getting it)
// - Try to respond before 500 ms elapsed
//
// Tests:
// I will not write automated tests for now.
//
// I will test with wrk2.
//
// $ repeat 100 curl -v -w 'Connect: %{time_connect}\nStart transfer: %{time_starttransfer}\nTotal:%{time_total}\n' http://127.0.0.1:8000/numbers\?u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/fibo
//
// $ repeat 100 curl -v -w 'Connect: %{time_connect}\nStart transfer: %{time_starttransfer}\nTotal:%{time_total}\n' http://127.0.0.1:8000/numbers\?u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/fibo\&u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/odd\&u=http://127.0.0.1:8090/rand\&u=http://127.0.0.1:8090/rand
//
// as for github.com/giltene/wrk2
// $ ./wrk -t2 -c800 -d30s -R800 -L -U --timeout 1s http://127.0.0.1:8000/numbers\?u=http://127.0.0.1:8090/primes\&u=http://127.0.0.1:8090/fibo
//
// Quick start guide:
// $ go run 500ms-fetcher.go --debug
//
// To run CPU profiler:
// $ go build ./500ms-fetcher.go
// $ env GODEBUG=gctrace=1 ./500ms-fetcher -cpuprofile=test.prof
// $ go tool pprof ./test.prof
//
package main
import (
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"runtime/trace"
"sort"
"strconv"
"sync/atomic"
"syscall"
"time"
_ "net/http/pprof"
)
var bind = flag.String("bind", ":8000", "bind address (--bind 127.0.0.1:8000)")
var timeout = flag.Duration("timeout", 500*time.Millisecond, "request processing maximum time (--timeout 500ms)")
var rtc = flag.Duration("rtc", 5*time.Millisecond, "const approximation of data processing overhead (--rtc 10ms)")
var debug = flag.Bool("debug", false, "Enable verbose logging")
var cpuprofile = flag.String("cpuprofile", "", "write cpu profile `file`")
var memprofile = flag.String("memprofile", "", "write memory profile to `file`")
var traceprofile = flag.String("traceprofile", "", "write trace profile to `file`")
var reqCounter uint64
type Message struct {
Numbers []int `json:"numbers"`
}
func main() {
flag.Parse()
if *cpuprofile != "" {
f, err := os.Create(*cpuprofile)
if err != nil {
log.Fatal(err)
}
pprof.StartCPUProfile(f)
defer pprof.StopCPUProfile()
}
if *traceprofile != "" {
f, err := os.Create(*traceprofile)
if err != nil {
log.Fatal(err)
}
trace.Start(f)
defer trace.Stop()
}
s := &http.Server{
Addr: *bind,
Handler: nil,
}
http.HandleFunc("/numbers", numbersHandler)
// allow printing stack traces on SIGQUIT sig
listenSIGQUIT()
// allow soft http server shutdown on Interrupt sig
// to be able to write cpu prof smoothly
shutdown := makeShutdownCh()
if err := ListenAndServe(s, shutdown); err != nil {
// Detect if channel is closed
ok := true
select {
case _, ok = <-shutdown:
default:
}
// Means shutdown is in progress - skip error
if ok {
log.Println(err)
}
}
}
// GET /numbers?u=... handler
func numbersHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.NotFound(w, r)
return
}
start := time.Now()
msg := &Message{
Numbers: []int{},
}
// Parse
u := r.URL.Query()["u"]
reqs := make([]string, 0, len(u))
for _, raw := range u {
if _, err := url.ParseRequestURI(raw); err != nil {
log.Println("Error parsing", raw, ":", err)
} else {
reqs = append(reqs, raw)
}
}
w.Header().Set("Content-Type", "application/json")
// Empty
if len(reqs) == 0 {
writeJSON(msg, w)
return
}
// Execute
id := uint64(0)
if *debug {
id = atomic.AddUint64(&reqCounter, 1)
}
pass := time.Now().Sub(start) + *rtc
if pass >= *timeout {
// Already behind the limit. returning { "numbers": [] } is fine
// but want to see 500 in load tests
log.Println("Parsing request run out of time")
http.Error(w, "Parsing request run out of time", http.StatusInternalServerError)
// This is nice place to snapshot memory under heavy load
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
}
f.Close()
}
return
}
waitDuration := *timeout - pass
wait := time.After(waitDuration)
res := getServicesResponse(id, reqs, waitDuration)
// Wait
waitLoop:
for {
select {
// it tries hard to return results asap
case resp, ok := <-res:
if ok {
// next result
msg.Numbers = resp
} else {
// finished
break waitLoop
}
case <-wait:
// finished
break waitLoop
}
}
// Response
writeJSON(msg, w)
if *debug {
log.Println(id, ":", r.URL.String(), "time=", time.Now().Sub(start))
}
}
// getServicesResponse sends requests in parallel and tries to preprocess
// response as soon as possible in time.
//
// I use single <collect> channel here instead of per-worker-result channel because
// I don't want to do multisig waits via reflect. But rational here is under
// the question. After all, I don't want to allocate more channels and I assume
// most of the services will respond in time.
func getServicesResponse(id uint64, reqs []string, timeout time.Duration) <-chan []int {
wait := time.After(timeout)
resp := make(chan []int, len(reqs))
agg := make(chan []int)
for _, req := range reqs {
go worker(id, req, timeout, agg)
}
go func() {
numbers := []int{}
finished := false
done := 0
for {
select {
case res := <-agg:
if !finished {
// Aggregation
// I am super lazy here not removing dups,
// but instead, I will use smart serializer
numbers = append(numbers, res...)
// Sort is not effective here, but ok for the demo purposes, right?
sort.Ints(numbers)
resp <- numbers
}
// if all workers finished(), close channels and exit
done++
if done == len(reqs) {
close(agg)
if !finished {
close(resp)
}
if *debug {
log.Println("[DEBUG]", id, ":", "finished processing", finished)
}
return
}
case <-wait:
finished = true
// let caller to proceed
close(resp)
// but collect must wait workers tail to finish
}
}
}()
return resp
}
// worker makes request to the service and returns numbers
func worker(id uint64, req string, timeout time.Duration, res chan<- []int) {
if *debug {
log.Println("[DEBUG]", id, ":", "start worker", req)
}
var body []byte
start := time.Now()
msg := &Message{
Numbers: []int{},
}
defer func() {
if msg.Numbers == nil {
msg.Numbers = []int{}
}
res <- msg.Numbers
if *debug {
log.Println("[DEBUG]", id, ":", "finished worker", req, "body", string(body), "time=", time.Now().Sub(start))
}
}()
// Get
client := &http.Client{
Timeout: timeout,
}
resp, err := client.Get(req)
if err != nil {
if *debug {
log.Println("[ERROR]", id, ":", "error fetching", req, ":", err)
}
return
}
// Read
body, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
if *debug {
log.Println("[ERROR]", id, ":", "error reading", req, "body:", err)
}
return
}
if err = json.Unmarshal(body, msg); err != nil {
if *debug {
log.Println("[ERROR]", id, ":", "error unmarshalling", req, "body", string(body), ":", err)
}
return
}
}
// writeJSON serializes Message and skips duplicates in sorted array
func writeJSON(m *Message, w http.ResponseWriter) {
w.WriteHeader(http.StatusOK)
// sorry, I would skip errors checking here
// have no idea for now is it buffered
w.Write([]byte("{\"numbers\":["))
if len(m.Numbers) > 0 {
last := m.Numbers[0]
w.Write([]byte(strconv.Itoa(last)))
for _, num := range m.Numbers {
if num != last {
w.Write([]byte(","))
w.Write([]byte(strconv.Itoa(num)))
last = num
}
}
}
w.Write([]byte("]}\n"))
}
// ListenAndServe listens on the TCP network address and
// calls Serve to handle requests on incoming connections. If
// srv.Addr is blank, ":http" is used.
func ListenAndServe(srv *http.Server, shutdown <-chan struct{}) error {
addr := srv.Addr
if addr == "" {
addr = ":http"
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
log.Println("Server bind to", addr)
go handleInterrupt(shutdown, srv, ln)
return srv.Serve(TCPKeepAliveListener{ln.(*net.TCPListener)})
}
// https://github.com/tylerb/graceful
func handleInterrupt(interrupt <-chan struct{}, srv *http.Server, listener net.Listener) {
<-interrupt
srv.SetKeepAlivesEnabled(false)
_ = listener.Close()
}
// TCPKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by ListenAndServe and ListenAndServeTLS so
// dead TCP connections (e.g. closing laptop mid-download) eventually
// go away.
type TCPKeepAliveListener struct {
*net.TCPListener
}
func (ln TCPKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
return tc, nil
}
// makeShutdownCh returns a channel that can be used for shutdown
// notifications for commands. This channel will send a message for every
// interrupt or SIGTERM received.
func makeShutdownCh() <-chan struct{} {
resultCh := make(chan struct{}, 1)
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Kill, os.Interrupt, syscall.SIGTERM)
go func() {
select {
case <-signalCh:
}
signal.Stop(signalCh)
log.Println("Got interrupt signal...")
resultCh <- struct{}{}
close(resultCh)
}()
return resultCh
}
func listenSIGQUIT() {
signalCh := make(chan os.Signal)
signal.Notify(signalCh, syscall.SIGQUIT)
go func() {
stacktrace := make([]byte, 5 * 1024 * 1024)
for {
select {
case <-signalCh:
length := runtime.Stack(stacktrace, true)
fmt.Println(string(stacktrace[:length]))
}
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment