Skip to content

Instantly share code, notes, and snippets.

@dallasmarlow
Last active May 2, 2019 20:08
Show Gist options
  • Save dallasmarlow/254b07d4f7df1eb0550997fcc3fa00ba to your computer and use it in GitHub Desktop.
Save dallasmarlow/254b07d4f7df1eb0550997fcc3fa00ba to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"crypto/tls"
"crypto/x509"
"flag"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/garyburd/redigo/redis"
"github.com/google/uuid"
histogram "github.com/vividcortex/gohistogram"
)
var (
enableTLS = flag.Bool("tls", false, "Enable TLS")
enableTSV = flag.Bool("tsv", false, "Enable TSV output")
enableStats = flag.Bool("stats", false, "Enable stats")
sampleRate = flag.Int("sampleRate", 1000, "Sample rate for TSV output")
offset = flag.Int("offset", 0, "Offset for key ids")
records = flag.Int("records", 10000000, "Number of records to set")
clients = flag.Int("clients", 50, "Number of clients")
timeout = flag.Duration("timeout", 1*time.Second, "Client timeout")
mode = flag.String("mode", "default", "Test mode")
setStats = histogram.NewHistogram(40)
getStats = histogram.NewHistogram(40)
connectStats = histogram.NewHistogram(40)
tsvBuf = bytes.Buffer{}
hostname string
network = "tcp"
address = "xxx:11539"
tlsHost = "xxx"
password = "xxx"
certPath = "/opt/certs"
certs = map[string]string{
"cert": "user.crt",
"key": "user.key",
"ca": "ca.pem",
}
)
func connect(cfg *tls.Config) redis.Conn {
opts := []redis.DialOption{
redis.DialPassword(password),
redis.DialConnectTimeout(*timeout),
redis.DialWriteTimeout(*timeout),
redis.DialReadTimeout(*timeout),
}
if *enableTLS {
opts = append(
opts,
[]redis.DialOption{
redis.DialUseTLS(true),
redis.DialTLSSkipVerify(true),
redis.DialTLSConfig(cfg),
}...,
)
}
conn, err := redis.Dial(
network,
address,
opts...)
if err != nil {
panic(err)
}
return conn
}
func config() *tls.Config {
cert, err := tls.LoadX509KeyPair(
filepath.Join(certPath, certs["cert"]),
filepath.Join(certPath, certs["key"]))
if err != nil {
panic(err)
}
ca, err := ioutil.ReadFile(filepath.Join(certPath, certs["ca"]))
if err != nil {
panic(err)
}
caPool := x509.NewCertPool()
caPool.AppendCertsFromPEM(ca)
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: tlsHost,
}
}
func recordLatencyMS(m string, i int, t time.Time, c chan float64) {
ms := float64(time.Since(t).Nanoseconds()) / (float64(time.Millisecond) / float64(time.Nanosecond))
if *enableStats {
c <- ms
}
if *enableTSV {
if rand.Intn(*records)%*sampleRate == 10 {
tsvBuf.WriteString(strings.Join(
[]string{
hostname,
m,
strconv.Itoa(i),
fmt.Sprintf("%f", ms),
strconv.FormatBool(*enableTLS),
strconv.Itoa(*clients),
},
"\t") + "\n")
}
}
}
func set(c redis.Conn, s chan float64, k int, v string) error {
start := time.Now()
if _, err := c.Do("SET", strconv.Itoa(k), v); err != nil {
return err
}
recordLatencyMS("set", k-*offset, start, s)
return nil
}
func getRandom(c redis.Conn, s chan float64, i int) error {
n := rand.Intn(*records) + *offset
start := time.Now()
if _, err := c.Do("GET", strconv.Itoa(n)); err != nil {
return err
}
recordLatencyMS("get", i, start, s)
return nil
}
func consumeStats(s chan float64, h *histogram.NumericHistogram, wg *sync.WaitGroup) {
defer wg.Done()
for n := range s {
h.Add(n)
}
}
func benchmark(mode string) {
src := make(chan int)
statsCh := make(chan float64)
wg := sync.WaitGroup{}
wg.Add(*clients)
for i := 0; i < *clients; i++ {
go func(src chan int, statsCh chan float64, wg *sync.WaitGroup) {
c := connect(config())
defer c.Close()
defer wg.Done()
for n := range src {
switch mode {
case "get":
if err := getRandom(c, statsCh, n-*offset); err != nil {
panic(err)
}
case "set":
v, err := uuid.NewUUID()
if err != nil {
panic(err)
}
if err := set(c, statsCh, n, v.String()); err != nil {
panic(err)
}
case "connect":
start := time.Now()
c.Close()
c = connect(config())
recordLatencyMS("connect", n-*offset, start, statsCh)
}
}
}(src, statsCh, &wg)
}
// setup stats consumer
statsWg := sync.WaitGroup{}
statsWg.Add(1)
switch mode {
case "get":
go consumeStats(statsCh, getStats, &statsWg)
case "set":
go consumeStats(statsCh, setStats, &statsWg)
case "connect":
go consumeStats(statsCh, connectStats, &statsWg)
}
// populate src chan
for i := 0; *records > i; i++ {
src <- i + *offset
}
// shutdown
close(src)
wg.Wait()
close(statsCh)
statsWg.Wait()
}
func run(m string) {
start := time.Now()
log.Println("starting", m, "workload")
benchmark(m)
elapsed := time.Since(start).Seconds()
log.Println(m, "runtime:", elapsed)
if *enableStats {
switch m {
case "set":
printStats(m, elapsed, setStats)
case "get":
printStats(m, elapsed, getStats)
case "connect":
printStats(m, elapsed, connectStats)
}
}
if *enableTSV {
log.Println(m, "TSV:")
fmt.Println(tsvBuf.String())
tsvBuf.Reset()
}
}
func printStats(m string, s float64, h *histogram.NumericHistogram) {
log.Println(m, "ops:", h.Count())
log.Println(m, "op/s:", h.Count()/s)
log.Println(m, "avg latency (ms):", h.Mean())
log.Println(m, "median latency (ms):", h.Quantile(0.5))
log.Println(m, "p90 latency (ms):", h.Quantile(0.9))
log.Println(m, "p95 latency (ms):", h.Quantile(0.95))
log.Println(m, "p99 latency (ms):", h.Quantile(0.99))
log.Println(m, "p999 latency (ms):", h.Quantile(0.999))
}
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
runtime.GOMAXPROCS(runtime.NumCPU() * 2)
h, err := os.Hostname()
if err != nil {
panic(err)
}
hostname = h
if *mode == "default" || *mode == "set" {
run("set")
}
if *mode == "default" || *mode == "get" {
run("get")
}
if *mode == "connect" {
run("connect")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment