Last active
April 26, 2023 08:36
-
-
Save erikdubbelboer/fe4095419fca55e2c92b3d0432ccd7fc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"fmt" | |
"net" | |
"runtime" | |
"sync" | |
"sync/atomic" | |
"time" | |
"github.com/valyala/fasthttp" | |
) | |
var ( | |
// Important to do these conversions only once an not | |
// every time we need them. | |
strSlashA = []byte("/a") | |
strSlashB = []byte("/b") | |
strDefaultBody = []byte("test") | |
strDestURLSlashA = []byte("http://127.0.0.1:1337/a") | |
strDestURLSlashB = []byte("http://127.0.0.1:1337/b") | |
client = &fasthttp.Client{ | |
NoDefaultUserAgentHeader: true, // Don't send: User-Agent: fasthttp | |
MaxConnsPerHost: 10000, | |
ReadBufferSize: 4096, // Make sure to set this big enough that your whole request can be read at once. | |
WriteBufferSize: 4096, // Same but for your response. | |
ReadTimeout: time.Second, | |
WriteTimeout: time.Second, | |
MaxIdleConnDuration: time.Minute, | |
DisableHeaderNamesNormalizing: true, // If you set the case on your headers correctly you can enable this. | |
} | |
// Put everything in pools to prevent garbage. | |
bytesPool = sync.Pool{ | |
New: func() interface{} { | |
b := make([]byte, 0) | |
return &b | |
}, | |
} | |
responsePool = sync.Pool{ | |
New: func() interface{} { | |
return make(chan *fasthttp.Response) | |
}, | |
} | |
// Our request counters. | |
aRequests int64 | |
bRequests int64 | |
) | |
func handler(ctx *fasthttp.RequestCtx) { | |
path := ctx.Path() | |
// Requests to /a will trigger 10 requests to /b | |
if bytes.HasPrefix(path, strSlashA) { | |
atomic.AddInt64(&aRequests, 1) | |
// This is the proper way to handle a []byte in a pool. | |
_combinedBodies := bytesPool.Get().(*[]byte) | |
combinedBodies := (*_combinedBodies)[:0] | |
defer func() { | |
*_combinedBodies = combinedBodies | |
bytesPool.Put(_combinedBodies) | |
}() | |
c := responsePool.Get().(chan *fasthttp.Response) | |
defer responsePool.Put(c) | |
for i := 0; i < 10; i++ { | |
go func() { | |
req := fasthttp.AcquireRequest() | |
res := fasthttp.AcquireResponse() | |
req.SetRequestURIBytes(strDestURLSlashB) | |
if err := fasthttp.Do(req, res); err != nil { | |
println(err.Error()) | |
} | |
fasthttp.ReleaseRequest(req) | |
// Don't release res here, that will be done at the other side of the channel. | |
c <- res | |
}() | |
} | |
// We know we started 10 goroutines, so also read 10 responses from the channel. | |
for i := 0; i < 10; i++ { | |
res := <-c | |
if res.StatusCode() != fasthttp.StatusOK { | |
println(res.StatusCode()) | |
} else { | |
// Combine all response bodies. | |
combinedBodies = append(combinedBodies, res.Body()...) | |
fasthttp.ReleaseResponse(res) | |
} | |
} | |
// Send back all 10 response bodies combined. | |
ctx.SetBody(combinedBodies) | |
ctx.Response.SetStatusCode(fasthttp.StatusOK) | |
} else if bytes.HasPrefix(path, strSlashB) { | |
atomic.AddInt64(&bRequests, 1) | |
// Just send some default body. | |
ctx.SetBody(strDefaultBody) | |
ctx.Response.SetStatusCode(fasthttp.StatusOK) | |
} else { | |
ctx.Response.SetStatusCode(fasthttp.StatusNotFound) | |
} | |
} | |
func main() { | |
// Completely disable memory profiling if we aren't going to use it. | |
// If we don't do this the profiler will take a sample every 0.5MiB bytes allocated. | |
runtime.MemProfileRate = 0 | |
ln, err := net.Listen("tcp4", "127.0.0.1:1337") | |
if err != nil { | |
panic(err) | |
} | |
defer ln.Close() | |
s := &fasthttp.Server{ | |
Handler: handler, | |
NoDefaultServerHeader: true, // Don't send Server: fasthttp | |
ReadBufferSize: 4096, // Make sure these are big enough. | |
WriteBufferSize: 4096, | |
ReadTimeout: time.Second, | |
WriteTimeout: time.Second, | |
IdleTimeout: time.Minute, // This can be long for keep-alive connections. | |
DisableHeaderNamesNormalizing: true, // If you're not going to look at headers or know the casing you can set this. | |
NoDefaultContentType: true, // Don't send Content-Type: text/plain if no Content-Type is set manually. | |
} | |
go func() { | |
if err := s.Serve(ln); err != nil { | |
panic(err) | |
} | |
}() | |
for i := 0; i < 4; i++ { | |
go func() { | |
for { | |
req := fasthttp.AcquireRequest() | |
res := fasthttp.AcquireResponse() | |
req.SetRequestURIBytes(strDestURLSlashA) | |
if err := client.Do(req, res); err != nil { | |
println(err.Error()) | |
} | |
if len(res.Body()) != 10*len(strDefaultBody) { | |
println(string(res.Body())) | |
} | |
fasthttp.ReleaseRequest(req) | |
fasthttp.ReleaseResponse(res) | |
} | |
}() | |
} | |
// Print the request counters every second. | |
for { | |
time.Sleep(time.Second) | |
a := atomic.SwapInt64(&aRequests, 0) | |
b := atomic.SwapInt64(&bRequests, 0) | |
fmt.Println(a, b, runtime.NumGoroutine()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment