Skip to content

Instantly share code, notes, and snippets.

@nickstenning
Created November 17, 2023 21:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickstenning/f45223aa282dabd2638a125446b00026 to your computer and use it in GitHub Desktop.
Save nickstenning/f45223aa282dabd2638a125446b00026 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"net/http"
"net/http/httptrace"
"os"
"os/signal"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/go-retryablehttp"
"github.com/replicate/go/httpclient"
)
const SingleRequestTimeout = 5 * time.Second
type Stats struct {
Successes atomic.Uint64
Failures atomic.Uint64
}
func main() {
client := NewForwarderHTTPClient()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
s := Stats{}
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error {
<-c
return fmt.Errorf("SIGINT")
})
for i := 0; i < 5; i++ {
g.Go(func() error { return worker(ctx, &s, client) })
}
g.Wait()
su := s.Successes.Load()
fa := s.Failures.Load()
fr := float64(fa) / float64(su)
fmt.Printf("successes = %d, failures = %d, ratio = %f\n", su, fa, fr)
}
func worker(baseCtx context.Context, s *Stats, client *http.Client) error {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
fmt.Printf("got conn: %+v\n", connInfo)
},
}
for {
req, _ := http.NewRequestWithContext(baseCtx, http.MethodGet, "http://localhost:8088", nil)
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
ctx, _ := context.WithDeadline(req.Context(), time.Now().Add(15*time.Second))
req = req.WithContext(ctx)
resp, err := client.Do(req)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
fmt.Printf("request error: %s\n", err.Error())
s.Failures.Add(1)
continue
}
defer resp.Body.Close()
var b bytes.Buffer
if n, err := io.Copy(&b, resp.Body); err != nil {
return err
} else {
log.Printf("request ok: read %d response bytes", n)
s.Successes.Add(1)
}
}
return nil
}
func NewForwarderHTTPClient() *http.Client {
retryClient := &retryablehttp.Client{
HTTPClient: httpclient.DefaultPooledClient(),
Logger: nil, // "logging" is provided by OTel transport on the web client
RetryWaitMin: 100 * time.Millisecond,
RetryWaitMax: 2 * time.Second,
RetryMax: 2,
CheckRetry: ForwarderRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
retryClient.HTTPClient.Timeout = SingleRequestTimeout
return retryClient.StandardClient()
}
func ForwarderRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
return false, ctx.Err()
}
// Return 429s to the client without retrying, so as not to consume any more
// than one "token" from their token bucket at the remote end.
if err == nil && resp != nil && resp.StatusCode == http.StatusTooManyRequests {
return false, nil
}
return retryablehttp.DefaultRetryPolicy(ctx, resp, err)
}
module blah.com/v1
go 1.20
require (
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-retryablehttp v0.7.5 // indirect
github.com/replicate/go v0.0.0-20231116151243-209e6ed12231 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/sync v0.5.0 // indirect
)
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.5 h1:bJj+Pj19UZMIweq/iie+1u5YCdGrnxCT9yvm0e+Nd5M=
github.com/hashicorp/go-retryablehttp v0.7.5/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/replicate/go v0.0.0-20231116151243-209e6ed12231 h1:9rBpVKsoNidxZo0f1aYlQBLjOut3KpwODaKLj/Q6/gk=
github.com/replicate/go v0.0.0-20231116151243-209e6ed12231/go.mod h1:2I5B6aPJgvVLDBllab+CpeEbb3BSO+24BRe60wBDryM=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 h1:x8Z78aZx8cOF0+Kkazoc7lwUNMGy0LrzEMxTm4BbTxg=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0/go.mod h1:62CPTSry9QZtOaSsE3tOzhx6LzDhHnXJ6xHeMNNiM6Q=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
package main
import (
"bytes"
"context"
"fmt"
"io"
"log"
"net/http"
"net/http/httptrace"
"time"
"github.com/hashicorp/go-retryablehttp"
"github.com/replicate/go/httpclient"
)
func NewForwarderHTTPClient() *http.Client {
retryClient := &retryablehttp.Client{
HTTPClient: httpclient.DefaultPooledClient(),
Logger: nil, // "logging" is provided by OTel transport on the web client
RetryWaitMin: 100 * time.Millisecond,
RetryWaitMax: 2 * time.Second,
RetryMax: 2,
CheckRetry: ForwarderRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
retryClient.HTTPClient.Timeout = 10 * time.Second
return retryClient.StandardClient()
}
func ForwarderRetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error) {
// do not retry on context.Canceled or context.DeadlineExceeded
if ctx.Err() != nil {
return false, ctx.Err()
}
// Return 429s to the client without retrying, so as not to consume any more
// than one "token" from their token bucket at the remote end.
if err == nil && resp != nil && resp.StatusCode == http.StatusTooManyRequests {
return false, nil
}
return retryablehttp.DefaultRetryPolicy(ctx, resp, err)
}
var globalClient = NewForwarderHTTPClient()
func handleFunc(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
fmt.Printf("got conn: %+v\n", connInfo)
},
}
ctx = httptrace.WithClientTrace(ctx, trace)
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
makeRequest(ctx)
makeRequest(ctx)
makeRequest(ctx)
w.Write([]byte("OK"))
}
func makeRequest(ctx context.Context) {
// req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:8088", nil)
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8088", nil)
// resp, err := globalClient.Do(req)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("request error: %s\n", err.Error())
return
}
defer resp.Body.Close()
var b bytes.Buffer
if n, err := io.Copy(&b, resp.Body); err != nil {
log.Printf("io error: %s\n", err.Error())
} else {
log.Printf("request ok: read %d response bytes", n)
}
}
func main() {
http.DefaultClient.Timeout = 5 * time.Second
s := &http.Server{
Addr: "127.0.0.1:8089",
Handler: http.HandlerFunc(handleFunc),
// ReadTimeout: 2 * time.Second,
// WriteTimeout: 2 * time.Second,
}
log.Fatalln(s.ListenAndServe())
}
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"log"
"net"
)
// main serves as the program entry point
func main() {
// create a tcp listener on the given port
listener, err := net.Listen("tcp", "127.0.0.1:8088")
if err != nil {
log.Fatalln("failed to create listener, err:", err)
}
fmt.Printf("listening on %s\n", listener.Addr())
// listen for new connections
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("failed to accept connection, err:", err)
continue
}
fmt.Printf("accepted connection from %s\n", conn.RemoteAddr())
go handleConnection(conn)
}
}
// handleConnection handles the lifetime of a connection
func handleConnection(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
var reqbuf bytes.Buffer
i := 0
for {
// read client request data
bytes, err := reader.ReadBytes(byte('\n'))
if err != nil {
if err != io.EOF {
fmt.Println("failed to read data, err:", err)
}
fmt.Printf("connection closed by client: %s\n", conn.RemoteAddr())
return
}
reqbuf.Write(bytes)
if string(bytes) == "\r\n" {
reqbuf.Reset()
if i < 2 {
handleRequest(conn)
}
i++
if i == 2 {
fmt.Println("served 2 requests: now ignoring all requests")
}
}
}
}
func handleRequest(conn net.Conn) {
fmt.Fprintf(conn, "HTTP/1.1 200 OK\r\n")
fmt.Fprintf(conn, "Content-Length: 12\r\n")
fmt.Fprintf(conn, "Content-Type: text/plain\r\n")
fmt.Fprintf(conn, "\r\n")
fmt.Fprintf(conn, "Hello World!")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment