Instantly share code, notes, and snippets.

@bbengfort /bench.go
Last active Sep 26, 2018

Embed
What would you like to do?
Blast throughput in Go.
type Blast struct {
requests uint64 // the number of successful requests
failures uint64 // the number of failed requests
started time.Time // the time the benchmark was started
duration time.Duration // the duration of the benchmark period
latencies []time.Duration // observed latencies in the number of requests
}
// Run N operations against the server at addr by putting a unique key with
// random values of size S. Returns a Benchmark object whose objects can be
// written out to a JSON lines file.
func RunBlast(addr string, N, S, uint) (bench *Blast, err error) {
bench = new(Blast)
bench.latencies = make([]time.Duration, N)
// Pre-allocate the requests and results arrays
results := make([]error, N)
requests := make([]*PutRequest, N)
// Create the wait group for all requests
group := new(sync.WaitGroup)
group.Add(int(N))
// Initialize the requests ahead of time so that generating keys and
// values is not part of the throughput computation.
for i := uint(0); i < N; i++ {
// Create the request to send to the server
requests[i] = &PutRequest{
Key: fmt.Sprintf("%04X", i),
Val: make([]byte, S)
}
// Create a random value of size S
rand.Read(requests[i])
}
// Initialize a client per operation in an array and connect to server.
// NOTE: done separately from requests loop to ensure connections are
// open for as little time as possible.
//
// NOTE: this is the source of an error discussed below, these lines of
// code should be replaced by a single client, through which all requests
// are issued (or possibly a few clients that are used in a round-robin
// fashion by the request threads).
clients := make([]KVClient, N)
for i := uint(0); i < N; i++ {
// Dial the grpc server
var conn *grpc.ClientConn
if conn, err = grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(10 * time.Second)); err != nil {
return fmt.Errorf("could not create client %d: %s", i, err)
}
defer conn.Close()
// Create the client for the grpc service
clients[i] = NewKVClient(conn)
}
// Execute the blast operations against the server
bench.started = time.Now()
for i := uint(0); i < N; i++ {
go func(k uint) {
// Create the context for the request
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()
// Make the request to the server and store the error in results
start := time.Now()
_, results[k] = clients[k].Put(reqs[k])
// Record the latency of the result and finish
bench.latencies[k] = time.Since(start)
group.Done()
}(i)
}
// Wait for all requests to finish, then record duration
group.Wait()
bench.duration = time.Since(bench.started)
// Compute successes and failures
for _, r := range results {
if r == nil {
bench.requests++
} else {
bench.failures++
}
}
return bench, nil
}
// Throughput computes the number of requests (excluding failures) by the
// total duration of the experiment, e.g. the operations per second.
func (b *Blast) Throughput() float64 {
if b.duration == 0 {
return 0.0
}
return float64(b.requests) / b.duration.Seconds()
}
// Latencies returns the array of latencies of requests (read-only)
func (b *Blast) Latencies() []time.Duration {
return b.latencies
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment