|
package main |
|
|
|
import ( |
|
"flag" |
|
"fmt" |
|
"log" |
|
"math/rand" |
|
"os" |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
wwr "github.com/qbeon/webwire-go" |
|
wwrclt "github.com/qbeon/webwire-go/client" |
|
) |
|
|
|
/****************************************************************\ |
|
CLI Arguments |
|
\****************************************************************/ |
|
|
|
// Server address |
|
var argServerAddr = flag.String("addr", ":8081", "server address") |
|
|
|
// Number of concurrent clients |
|
var argNumClients = flag.Uint("clients", 10, "number of concurrent clients") |
|
|
|
// Request timeout |
|
var argReqTimeout = flag.Uint( |
|
"req-timeo", |
|
10000, |
|
"default request timeout in milliseconds", |
|
) |
|
|
|
// Min/Max request interval |
|
var argMinReqInterval = flag.Uint( |
|
"min-req-itv", |
|
250, |
|
"min interval between each request in milliseconds", |
|
) |
|
var argMaxReqInterval = flag.Uint( |
|
"max-req-itv", |
|
500, |
|
"max interval between each request in milliseconds", |
|
) |
|
|
|
// Min/Max payload size |
|
var argMinPayloadSize = flag.Uint( |
|
"min-pld-sz", |
|
32, |
|
"min request payload size in bytes", |
|
) |
|
var argMaxPayloadSize = flag.Uint( |
|
"max-pld-sz", |
|
128, |
|
"max request payload size in bytes", |
|
) |
|
|
|
// Max benchmark duration |
|
var argBenchDur = flag.Uint( |
|
"bench-dur", |
|
10, |
|
"benchmark duration in seconds", |
|
) |
|
|
|
/****************************************************************\ |
|
Utils |
|
\****************************************************************/ |
|
|
|
var benchmarkTimeoutTriggered = int32(0) |
|
|
|
func benchmarkFinished() bool { |
|
return atomic.LoadInt32(&benchmarkTimeoutTriggered) > 0 |
|
} |
|
|
|
func random(min, max int) int { |
|
if min == max { |
|
return min |
|
} |
|
rand.Seed(time.Now().Unix()) |
|
return rand.Intn(max-min) + min |
|
} |
|
|
|
func parseCliArgs() { |
|
// Parse command line arguments |
|
flag.Parse() |
|
err := false |
|
|
|
// Validate server address |
|
if len(*argServerAddr) < 1 { |
|
err = true |
|
log.Printf("INVALID ARGS: invalid server address ('%s')", *argServerAddr) |
|
} |
|
|
|
// Validate the number of concurrent clients |
|
if *argNumClients < 1 { |
|
err = true |
|
log.Printf("INVALID ARGS: number of concurrent clients cannot be zero") |
|
} |
|
|
|
// Validate request timeout |
|
if *argReqTimeout < 1 { |
|
err = true |
|
log.Printf("INVALID ARGS: request timeout cannot be smaller 1 millisecond") |
|
} |
|
|
|
// Validate request interval range |
|
if *argMinReqInterval > *argMaxReqInterval { |
|
err = true |
|
log.Printf( |
|
"INVALID ARGS: min request interval (%d) grater max parameter (%d)", |
|
*argMinReqInterval, |
|
*argMaxReqInterval, |
|
) |
|
} |
|
|
|
// Validate payload size range |
|
if *argMinPayloadSize > *argMaxPayloadSize { |
|
err = true |
|
log.Printf( |
|
"INVALID ARGS: min payload size (%d) grater max parameter (%d)", |
|
*argMinPayloadSize, |
|
*argMaxPayloadSize, |
|
) |
|
} |
|
|
|
if err { |
|
os.Exit(1) |
|
} |
|
} |
|
|
|
func newPayload() wwr.Payload { |
|
size := random(int(*argMinPayloadSize), int(*argMaxPayloadSize)) |
|
data := make([]byte, size) |
|
|
|
for i := 0; i < size; i++ { |
|
data[i] = 'a' |
|
} |
|
|
|
// Determine a payload to be sent to the server, use default binary encoding |
|
return wwr.Payload{ |
|
Encoding: wwr.EncodingBinary, |
|
Data: data, |
|
} |
|
} |
|
|
|
func randomReqIntervalSleep() time.Duration { |
|
return time.Duration(random(int(*argMinReqInterval), int(*argMaxReqInterval))) * |
|
time.Millisecond |
|
} |
|
|
|
/****************************************************************\ |
|
Stats |
|
\****************************************************************/ |
|
|
|
var statLock = sync.Mutex{} |
|
|
|
var statTotalReqTime = time.Duration(0) |
|
var statMaxReqTime = time.Duration(0) |
|
var statMinReqTime = time.Duration(0) |
|
|
|
var statTotalReqInterval = time.Duration(0) |
|
var statMaxReqInterval = time.Duration(0) |
|
var statMinReqInterval = time.Duration(0) |
|
|
|
var statTotalReqsPerformed = uint64(0) |
|
var statTotalReqTimeouts = uint64(0) |
|
var statTotalBytesSent = uint64(0) |
|
var statTotalBytesReceived = uint64(0) |
|
|
|
func humanReadableDataSize(numBytes float64) (float64, string) { |
|
switch { |
|
case numBytes >= float64(1024*1024*1024*1024*1024): |
|
return numBytes / float64((1024 * 1024 * 1024 * 1024 * 1024)), "PiB" |
|
case numBytes >= float64(1024*1024*1024*1024): |
|
return numBytes / float64((1024 * 1024 * 1024 * 1024)), "TiB" |
|
case numBytes >= float64(1024*1024*1024): |
|
return numBytes / float64((1024 * 1024 * 1024)), "GiB" |
|
case numBytes >= float64(1024*1024): |
|
return numBytes / float64((1024 * 1024)), "MiB" |
|
case numBytes >= float64(1024): |
|
return numBytes / float64(1024), "KiB" |
|
} |
|
return 0, "NaN" |
|
} |
|
|
|
func printStats() { |
|
reqPerSec := float64(statTotalReqsPerformed) / float64(*argBenchDur) |
|
bytesPerSec := float64(uint64(statTotalBytesSent) / uint64(*argBenchDur)) |
|
|
|
avgReqTime := statTotalReqTime / time.Duration(statTotalReqsPerformed) |
|
avgReqInterval := statTotalReqInterval / time.Duration(statTotalReqsPerformed) |
|
avgPayloadSize := statTotalBytesSent / statTotalReqsPerformed |
|
|
|
fmt.Println(" ") |
|
log.Printf(" Benchmark finished (%ds)\n", *argBenchDur) |
|
fmt.Println(" ") |
|
fmt.Printf(" Requests performed: %d\n", statTotalReqsPerformed) |
|
fmt.Printf(" Requests timed out: %d\n", statTotalReqTimeouts) |
|
fmt.Println(" ") |
|
|
|
dataSent, sentUnits := humanReadableDataSize(float64(statTotalBytesSent)) |
|
dataReceived, receivedUnits := humanReadableDataSize(float64(statTotalBytesReceived)) |
|
avgPayloadSizeNum, avgPayloadSizeUnits := humanReadableDataSize(float64(avgPayloadSize)) |
|
|
|
fmt.Printf(" Data sent: %.2f %s (%d bytes)\n", |
|
dataSent, |
|
sentUnits, |
|
statTotalBytesSent, |
|
) |
|
fmt.Printf(" Data received: %.2f %s (%d bytes)\n", |
|
dataReceived, |
|
receivedUnits, |
|
statTotalBytesReceived, |
|
) |
|
fmt.Printf(" Avg payload size: %.2f %s\n", avgPayloadSizeNum, avgPayloadSizeUnits) |
|
fmt.Println(" ") |
|
|
|
numPerSec, units := humanReadableDataSize(bytesPerSec) |
|
|
|
fmt.Printf(" Avg req itv: %s\n", avgReqInterval) |
|
fmt.Printf(" Max req itv: %s\n", statMaxReqInterval) |
|
fmt.Printf(" Min req itv: %s\n", statMinReqInterval) |
|
fmt.Println(" ") |
|
|
|
fmt.Printf(" Avg req time: %s\n", avgReqTime) |
|
fmt.Printf(" Max req time: %s\n", statMaxReqTime) |
|
fmt.Printf(" Min req time: %s\n", statMinReqTime) |
|
fmt.Println(" ") |
|
|
|
fmt.Printf(" Req/s: %.0f\n", reqPerSec) |
|
fmt.Printf(" Bytes/s: %.0f\n", bytesPerSec) |
|
fmt.Printf(" Throughput: %.2f %s/s\n", numPerSec, units) |
|
} |
|
|
|
func statsAddRequest(interval, reqTime time.Duration, requestSize, replySize int) { |
|
statLock.Lock() |
|
|
|
statTotalReqsPerformed++ |
|
statTotalBytesSent += uint64(requestSize) |
|
statTotalBytesReceived += uint64(replySize) |
|
|
|
statTotalReqTime += reqTime |
|
if statMinReqTime == 0 || reqTime < statMinReqTime { |
|
statMinReqTime = reqTime |
|
} |
|
if reqTime > statMaxReqTime { |
|
statMaxReqTime = reqTime |
|
} |
|
|
|
statTotalReqInterval += interval |
|
if statMinReqInterval == 0 || interval < statMinReqInterval { |
|
statMinReqInterval = interval |
|
} |
|
if interval > statMaxReqInterval { |
|
statMaxReqInterval = interval |
|
} |
|
|
|
statLock.Unlock() |
|
} |
|
|
|
func statsAddTimedoutRequest() { |
|
atomic.AddUint64(&statTotalReqTimeouts, 1) |
|
} |
|
|
|
/****************************************************************\ |
|
Client |
|
\****************************************************************/ |
|
|
|
func startClient(clientIndex uint, wg *sync.WaitGroup) { |
|
wg.Add(1) |
|
|
|
// Initialize client |
|
client := wwrclt.NewClient( |
|
*argServerAddr, |
|
wwrclt.Options{ |
|
// No hooks required in this example |
|
Hooks: wwrclt.Hooks{}, |
|
// Default timeout for timed requests |
|
DefaultRequestTimeout: time.Duration(*argReqTimeout) * time.Millisecond, |
|
}, |
|
) |
|
|
|
for { |
|
// Check for shutdown |
|
if benchmarkFinished() { |
|
break |
|
} |
|
|
|
// Wait before sending the next request |
|
interval := randomReqIntervalSleep() |
|
time.Sleep(interval) |
|
|
|
payload := newPayload() |
|
|
|
// Capture time of request begining |
|
start := time.Now() |
|
|
|
// Send request and await reply |
|
reply, err := client.Request("", payload) |
|
|
|
// Compute elapsed time since request start |
|
elapsed := time.Since(start) |
|
|
|
// Investigate request error |
|
switch err := err.(type) { |
|
case nil: |
|
case wwr.ReqTimeoutErr: |
|
statsAddTimedoutRequest() |
|
continue |
|
default: |
|
panic(fmt.Errorf("ERROR: Unexpected request error: %s", err)) |
|
} |
|
|
|
// Update stats |
|
statsAddRequest(interval, elapsed, len(payload.Data), len(reply.Data)) |
|
} |
|
|
|
client.Close() |
|
|
|
wg.Done() |
|
} |
|
|
|
/****************************************************************\ |
|
Main |
|
\****************************************************************/ |
|
|
|
func main() { |
|
parseCliArgs() |
|
var wg sync.WaitGroup |
|
|
|
// Start clients |
|
for i := uint(0); i < *argNumClients; i++ { |
|
go startClient(i, &wg) |
|
} |
|
|
|
// Start benchmark timeout triggering goroutine |
|
time.AfterFunc(time.Duration(*argBenchDur)*time.Second, func() { |
|
log.Print("Benchmark timeout triggered") |
|
atomic.StoreInt32(&benchmarkTimeoutTriggered, 1) |
|
}) |
|
|
|
log.Printf("All clients (%d) operational", *argNumClients) |
|
wg.Wait() |
|
|
|
printStats() |
|
} |