Skip to content

Instantly share code, notes, and snippets.

@romshark
Last active April 2, 2018 19:39
Show Gist options
  • Save romshark/94120c59e73b6ebc1b6c1f7857f73309 to your computer and use it in GitHub Desktop.
Save romshark/94120c59e73b6ebc1b6c1f7857f73309 to your computer and use it in GitHub Desktop.
A benchmark for WebWire (https://github.com/qbeon/webwire-go)

Webwire Benchmarking Tool

How to use

  1. Start the test server: go run test-server.go
  2. Run the benchmark: go run benchmark.go

Following parameters are available:

  • bench-dur: benchmark duration in seconds (default: 10)
  • addr: address of the target test server like localhost:80 (default: :8081)
  • clients: number of concurrent clients (default: 10)
  • req-timeo: default request timeout (default: 10000)
  • min-req-itv: min interval between each request in milliseconds (default: 250)
  • max-req-itv: max interval between each request in milliseconds (default: 500)
  • min-pld-sz: min request payload size in bytes (default: 32)
  • max-pld-sz: max request payload size in bytes (default: 128)

Example

Here's an example of a 60 seconds long benchmark with 1,000 concurrent connections each sending requests with a 1 KiB payload in a 10 to 30 milliseconds interval:

go run benchmark.go -clients 1000 -min-req-itv 10 -max-req-itv 30 -min-pld-sz 1024 -max-pld-sz 1024 -req-timeo 60000 -bench-dur 60

And here's the results of the above benchmark:

2018/04/02 21:20:19   Benchmark finished (60s)

  Requests performed:  1892900
  Requests timed out:  0

  Data sent:           1.81 GiB (1938329600 bytes)
  Data received:       1.81 GiB (1938329600 bytes)
  Avg payload size:    1.00 KiB

  Avg req itv:         19.955008ms
  Max req itv:         29ms
  Min req itv:         10ms

  Avg req time:        9.420078ms
  Max req time:        832.1403ms
  Min req time:        1.0004ms

  Req/s:               31548
  Bytes/s:             32305493
  Throughput:          30.81 MiB/s

System: I7 3930K hexa-core @ 3.8 Ghz; 64,0 GB DDR3 RAM @ 1833 Mhz

package main
import (
"flag"
"fmt"
"log"
"math/rand"
"os"
"sync"
"sync/atomic"
"time"
wwr "github.com/qbeon/webwire-go"
wwrclt "github.com/qbeon/webwire-go/client"
)
/****************************************************************\
CLI Arguments
\****************************************************************/
// Server address
var argServerAddr = flag.String("addr", ":8081", "server address")
// Number of concurrent clients
var argNumClients = flag.Uint("clients", 10, "number of concurrent clients")
// Request timeout
var argReqTimeout = flag.Uint(
"req-timeo",
10000,
"default request timeout in milliseconds",
)
// Min/Max request interval
var argMinReqInterval = flag.Uint(
"min-req-itv",
250,
"min interval between each request in milliseconds",
)
var argMaxReqInterval = flag.Uint(
"max-req-itv",
500,
"max interval between each request in milliseconds",
)
// Min/Max payload size
var argMinPayloadSize = flag.Uint(
"min-pld-sz",
32,
"min request payload size in bytes",
)
var argMaxPayloadSize = flag.Uint(
"max-pld-sz",
128,
"max request payload size in bytes",
)
// Max benchmark duration
var argBenchDur = flag.Uint(
"bench-dur",
10,
"benchmark duration in seconds",
)
/****************************************************************\
Utils
\****************************************************************/
var benchmarkTimeoutTriggered = int32(0)
func benchmarkFinished() bool {
return atomic.LoadInt32(&benchmarkTimeoutTriggered) > 0
}
func random(min, max int) int {
if min == max {
return min
}
rand.Seed(time.Now().Unix())
return rand.Intn(max-min) + min
}
func parseCliArgs() {
// Parse command line arguments
flag.Parse()
err := false
// Validate server address
if len(*argServerAddr) < 1 {
err = true
log.Printf("INVALID ARGS: invalid server address ('%s')", *argServerAddr)
}
// Validate the number of concurrent clients
if *argNumClients < 1 {
err = true
log.Printf("INVALID ARGS: number of concurrent clients cannot be zero")
}
// Validate request timeout
if *argReqTimeout < 1 {
err = true
log.Printf("INVALID ARGS: request timeout cannot be smaller 1 millisecond")
}
// Validate request interval range
if *argMinReqInterval > *argMaxReqInterval {
err = true
log.Printf(
"INVALID ARGS: min request interval (%d) grater max parameter (%d)",
*argMinReqInterval,
*argMaxReqInterval,
)
}
// Validate payload size range
if *argMinPayloadSize > *argMaxPayloadSize {
err = true
log.Printf(
"INVALID ARGS: min payload size (%d) grater max parameter (%d)",
*argMinPayloadSize,
*argMaxPayloadSize,
)
}
if err {
os.Exit(1)
}
}
func newPayload() wwr.Payload {
size := random(int(*argMinPayloadSize), int(*argMaxPayloadSize))
data := make([]byte, size)
for i := 0; i < size; i++ {
data[i] = 'a'
}
// Determine a payload to be sent to the server, use default binary encoding
return wwr.Payload{
Encoding: wwr.EncodingBinary,
Data: data,
}
}
func randomReqIntervalSleep() time.Duration {
return time.Duration(random(int(*argMinReqInterval), int(*argMaxReqInterval))) *
time.Millisecond
}
/****************************************************************\
Stats
\****************************************************************/
var statLock = sync.Mutex{}
var statTotalReqTime = time.Duration(0)
var statMaxReqTime = time.Duration(0)
var statMinReqTime = time.Duration(0)
var statTotalReqInterval = time.Duration(0)
var statMaxReqInterval = time.Duration(0)
var statMinReqInterval = time.Duration(0)
var statTotalReqsPerformed = uint64(0)
var statTotalReqTimeouts = uint64(0)
var statTotalBytesSent = uint64(0)
var statTotalBytesReceived = uint64(0)
func humanReadableDataSize(numBytes float64) (float64, string) {
switch {
case numBytes >= float64(1024*1024*1024*1024*1024):
return numBytes / float64((1024 * 1024 * 1024 * 1024 * 1024)), "PiB"
case numBytes >= float64(1024*1024*1024*1024):
return numBytes / float64((1024 * 1024 * 1024 * 1024)), "TiB"
case numBytes >= float64(1024*1024*1024):
return numBytes / float64((1024 * 1024 * 1024)), "GiB"
case numBytes >= float64(1024*1024):
return numBytes / float64((1024 * 1024)), "MiB"
case numBytes >= float64(1024):
return numBytes / float64(1024), "KiB"
}
return 0, "NaN"
}
func printStats() {
reqPerSec := float64(statTotalReqsPerformed) / float64(*argBenchDur)
bytesPerSec := float64(uint64(statTotalBytesSent) / uint64(*argBenchDur))
avgReqTime := statTotalReqTime / time.Duration(statTotalReqsPerformed)
avgReqInterval := statTotalReqInterval / time.Duration(statTotalReqsPerformed)
avgPayloadSize := statTotalBytesSent / statTotalReqsPerformed
fmt.Println(" ")
log.Printf(" Benchmark finished (%ds)\n", *argBenchDur)
fmt.Println(" ")
fmt.Printf(" Requests performed: %d\n", statTotalReqsPerformed)
fmt.Printf(" Requests timed out: %d\n", statTotalReqTimeouts)
fmt.Println(" ")
dataSent, sentUnits := humanReadableDataSize(float64(statTotalBytesSent))
dataReceived, receivedUnits := humanReadableDataSize(float64(statTotalBytesReceived))
avgPayloadSizeNum, avgPayloadSizeUnits := humanReadableDataSize(float64(avgPayloadSize))
fmt.Printf(" Data sent: %.2f %s (%d bytes)\n",
dataSent,
sentUnits,
statTotalBytesSent,
)
fmt.Printf(" Data received: %.2f %s (%d bytes)\n",
dataReceived,
receivedUnits,
statTotalBytesReceived,
)
fmt.Printf(" Avg payload size: %.2f %s\n", avgPayloadSizeNum, avgPayloadSizeUnits)
fmt.Println(" ")
numPerSec, units := humanReadableDataSize(bytesPerSec)
fmt.Printf(" Avg req itv: %s\n", avgReqInterval)
fmt.Printf(" Max req itv: %s\n", statMaxReqInterval)
fmt.Printf(" Min req itv: %s\n", statMinReqInterval)
fmt.Println(" ")
fmt.Printf(" Avg req time: %s\n", avgReqTime)
fmt.Printf(" Max req time: %s\n", statMaxReqTime)
fmt.Printf(" Min req time: %s\n", statMinReqTime)
fmt.Println(" ")
fmt.Printf(" Req/s: %.0f\n", reqPerSec)
fmt.Printf(" Bytes/s: %.0f\n", bytesPerSec)
fmt.Printf(" Throughput: %.2f %s/s\n", numPerSec, units)
}
func statsAddRequest(interval, reqTime time.Duration, requestSize, replySize int) {
statLock.Lock()
statTotalReqsPerformed++
statTotalBytesSent += uint64(requestSize)
statTotalBytesReceived += uint64(replySize)
statTotalReqTime += reqTime
if statMinReqTime == 0 || reqTime < statMinReqTime {
statMinReqTime = reqTime
}
if reqTime > statMaxReqTime {
statMaxReqTime = reqTime
}
statTotalReqInterval += interval
if statMinReqInterval == 0 || interval < statMinReqInterval {
statMinReqInterval = interval
}
if interval > statMaxReqInterval {
statMaxReqInterval = interval
}
statLock.Unlock()
}
func statsAddTimedoutRequest() {
atomic.AddUint64(&statTotalReqTimeouts, 1)
}
/****************************************************************\
Client
\****************************************************************/
func startClient(clientIndex uint, wg *sync.WaitGroup) {
wg.Add(1)
// Initialize client
client := wwrclt.NewClient(
*argServerAddr,
wwrclt.Options{
// No hooks required in this example
Hooks: wwrclt.Hooks{},
// Default timeout for timed requests
DefaultRequestTimeout: time.Duration(*argReqTimeout) * time.Millisecond,
},
)
for {
// Check for shutdown
if benchmarkFinished() {
break
}
// Wait before sending the next request
interval := randomReqIntervalSleep()
time.Sleep(interval)
payload := newPayload()
// Capture time of request begining
start := time.Now()
// Send request and await reply
reply, err := client.Request("", payload)
// Compute elapsed time since request start
elapsed := time.Since(start)
// Investigate request error
switch err := err.(type) {
case nil:
case wwr.ReqTimeoutErr:
statsAddTimedoutRequest()
continue
default:
panic(fmt.Errorf("ERROR: Unexpected request error: %s", err))
}
// Update stats
statsAddRequest(interval, elapsed, len(payload.Data), len(reply.Data))
}
client.Close()
wg.Done()
}
/****************************************************************\
Main
\****************************************************************/
func main() {
parseCliArgs()
var wg sync.WaitGroup
// Start clients
for i := uint(0); i < *argNumClients; i++ {
go startClient(i, &wg)
}
// Start benchmark timeout triggering goroutine
time.AfterFunc(time.Duration(*argBenchDur)*time.Second, func() {
log.Print("Benchmark timeout triggered")
atomic.StoreInt32(&benchmarkTimeoutTriggered, 1)
})
log.Printf("All clients (%d) operational", *argNumClients)
wg.Wait()
printStats()
}
package main
import (
"bufio"
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"runtime"
"syscall"
wwr "github.com/qbeon/webwire-go"
)
/****************************************************************\
Server Implementation
\****************************************************************/
// LoadTestServer implements the webwire.ServerImplementation interface
type LoadTestServer struct{}
// OnOptions implements the webwire.ServerImplementation interface.
// Does nothing, not needed in this example
func (srv *LoadTestServer) OnOptions(_ http.ResponseWriter) {}
// OnSignal implements the webwire.ServerImplementation interface
// Does nothing, not needed in this example
func (srv *LoadTestServer) OnSignal(ctx context.Context) {}
// OnClientConnected implements the webwire.ServerImplementation interface.
// Does nothing, not needed in this example
func (srv *LoadTestServer) OnClientConnected(client *wwr.Client) {}
// OnClientDisconnected implements the webwire.ServerImplementation interface
// Does nothing, not needed in this example
func (srv *LoadTestServer) OnClientDisconnected(client *wwr.Client) {}
// BeforeUpgrade implements the webwire.ServerImplementation interface.
// Must return true to ensure incoming connections are accepted
func (srv *LoadTestServer) BeforeUpgrade(resp http.ResponseWriter, req *http.Request) bool {
return true
}
// OnRequest implements the webwire.ServerImplementation interface.
// Returns the received message back to the client
func (srv *LoadTestServer) OnRequest(ctx context.Context) (response wwr.Payload, err error) {
msg := ctx.Value(wwr.Msg).(wwr.Message)
// Reply to the request using the same data and encoding
return msg.Payload, nil
}
/****************************************************************\
Utils
\****************************************************************/
func listenForOsSignals(server *wwr.HeadedServer) {
// Listen for OS signals and shutdown server in case of demanded termination
osSignals := make(chan os.Signal, 1)
signal.Notify(osSignals, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
sig := <-osSignals
log.Printf("Termination demanded by the OS (%s), shutting down...", sig)
if err := server.Shutdown(); err != nil {
log.Printf("Error during server shutdown: %s", err)
}
log.Println("Server gracefully terminated")
}
func startUserInterface() {
buf := bufio.NewReader(os.Stdin)
for {
fmt.Print("> ")
cmd, err := buf.ReadBytes('\n')
command := string(cmd[:len(cmd)-1])
if err != nil {
log.Print(err)
continue
}
switch command {
// Manually trigger the garbage collector
case "gc":
log.Print("Initiating garbage collection...")
runtime.GC()
default:
log.Printf("Invalid command: '%s'", command)
}
}
}
/****************************************************************\
Main
\****************************************************************/
var serverAddr = flag.String("addr", ":8081", "server address")
func main() {
// Parse command line arguments
flag.Parse()
// Setup headed webwire server
server, err := wwr.NewHeadedServer(
&LoadTestServer{},
wwr.HeadedServerOptions{
ServerAddress: *serverAddr,
ServerOptions: wwr.ServerOptions{
WarnLog: os.Stdout,
ErrorLog: os.Stderr,
},
},
)
if err != nil {
panic(fmt.Errorf("Failed setting up WebWire server: %s", err))
}
log.Printf("Listening on %s", server.Addr().String())
go listenForOsSignals(server)
go startUserInterface()
// Launch echo server
if err := server.Run(); err != nil {
panic(fmt.Errorf("WebWire server failed: %s", err))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment