Skip to content

Instantly share code, notes, and snippets.

@siddontang
Last active January 19, 2018 02:50
Show Gist options
  • Save siddontang/6c2e7656460b37723a487bd6da68d6db to your computer and use it in GitHub Desktop.
Save siddontang/6c2e7656460b37723a487bd6da68d6db to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/binary"
"flag"
"fmt"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
log "github.com/sirupsen/logrus"
)
var (
pd = flag.String("pd", "127.0.0.1:2379", "PD endpoints")
statusAddr = flag.String("status", ":10081", "Status listening address")
pushAddr = flag.String("push", "", "Prometheus push address")
concurrent = flag.Int("c", 200, "Number of concurrent workers")
duration = flag.Duration("d", 0, "Run duration. If 0, run forever")
// For case
rowNumber = flag.Int("row", 10000000, "Number of rows")
colNumber = flag.Int("col", 20, "Number of columns a row has")
colSize = flag.Int("size", 1024, "Data size of a column")
readRate = flag.Float64("read-rate", 0.8, "Read rate")
conflictRate = flag.Float64("conflict-rate", 0.008, "Write conflict rate")
)
type statistic int
const (
nonEmptyReads statistic = iota
emptyReads
writes
writeErrors
readErrors
conflictWrites
conflictWriteErrors
statsLength
)
var globalStats [statsLength]uint64
func snapshotStats() (s [statsLength]uint64) {
for i := 0; i < int(statsLength); i++ {
s[i] = atomic.LoadUint64(&globalStats[i])
}
return s
}
// Worker executes the operations
type Worker struct {
db kv.Storage
r *rand.Rand
ctx context.Context
cancel context.CancelFunc
offsetRow int
rowCount int
// Cache sometine to avoid frequent allocation
colKeys [][]byte
colData []byte
}
// Read reads a row
func (w *Worker) Read() {
start := time.Now()
defer func() { cmdDuration.WithLabelValues("read").Observe(time.Since(start).Seconds()) }()
empty, err := w.read()
if err != nil {
atomic.AddUint64(&globalStats[readErrors], 1)
return
}
if !empty {
atomic.AddUint64(&globalStats[nonEmptyReads], 1)
return
}
atomic.AddUint64(&globalStats[emptyReads], 1)
return
}
// Write writes a row
func (w *Worker) Write() {
start := time.Now()
defer func() { cmdDuration.WithLabelValues("write").Observe(time.Since(start).Seconds()) }()
if err := w.write(false); err != nil {
atomic.AddUint64(&globalStats[writeErrors], 1)
return
}
atomic.AddUint64(&globalStats[writes], 1)
return
}
// ConflictWrite writes row 0
func (w *Worker) ConflictWrite() {
start := time.Now()
defer func() { cmdDuration.WithLabelValues("conflict_write").Observe(time.Since(start).Seconds()) }()
if err := w.write(true); err != nil {
atomic.AddUint64(&globalStats[conflictWriteErrors], 1)
return
}
atomic.AddUint64(&globalStats[conflictWrites], 1)
return
}
func (w *Worker) buildKeys(useConflict bool) {
id := 0
if !useConflict {
id = w.r.Intn(w.rowCount) + w.offsetRow
}
for i := 0; i < len(w.colKeys); i++ {
binary.BigEndian.PutUint64(w.colKeys[i], uint64(id))
}
}
func (w *Worker) read() (bool, error) {
txn, err := w.db.Begin()
if err != nil {
return false, err
}
defer txn.Rollback()
w.buildKeys(false)
empty := false
for _, key := range w.colKeys {
data, err := txn.Get(key)
if err != nil && !kv.IsErrNotFound(err) {
return false, err
}
if data == nil {
empty = true
}
}
if err := txn.Commit(w.ctx); err != nil {
return false, err
}
return empty, nil
}
func (w *Worker) write(useConflict bool) error {
txn, err := w.db.Begin()
if err != nil {
return err
}
defer txn.Rollback()
w.buildKeys(useConflict)
for _, key := range w.colKeys {
if err := txn.Set(key, w.colData); err != nil {
return err
}
}
return txn.Commit(w.ctx)
}
// Run runs the worker
func (w *Worker) Run() {
defer w.cancel()
for {
select {
case <-w.ctx.Done():
return
default:
if w.chooseRead() {
w.Read()
} else {
if w.useConflict() {
w.ConflictWrite()
} else {
w.Write()
}
}
}
}
}
func (w *Worker) chooseRead() bool {
if *readRate == 0 {
return false
}
rate := w.r.Float64()
return rate <= *readRate
}
func (w *Worker) useConflict() bool {
rate := w.r.Float64()
return rate <= *conflictRate
}
var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func (w *Worker) randData(length int) []byte {
buf := make([]byte, length)
for i := range buf {
buf[i] = letters[w.r.Intn(len(letters))]
}
return buf
}
func newWorker(ctx context.Context, db kv.Storage, index int) *Worker {
step := *rowNumber / *concurrent
source := rand.NewSource(int64(time.Now().UnixNano()))
subCtx, subCancel := context.WithCancel(ctx)
w := &Worker{
db: db,
r: rand.New(source),
ctx: subCtx,
cancel: subCancel,
offsetRow: index * step,
rowCount: step,
colKeys: make([][]byte, *colNumber),
}
w.colData = w.randData(*colSize)
for i := 0; i < len(w.colKeys); i++ {
w.colKeys[i] = make([]byte, 8+2)
binary.BigEndian.PutUint16(w.colKeys[i][0:8], 0)
binary.BigEndian.PutUint16(w.colKeys[i][8:], uint16(i))
}
return w
}
func main() {
flag.Parse()
go func() {
http.Handle("/metrics", prometheus.Handler())
http.ListenAndServe(*statusAddr, nil)
}()
PushMetrics(*pushAddr)
ctx, cancel := context.WithCancel(context.Background())
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
var wg sync.WaitGroup
wg.Add(*concurrent)
driver := tikv.Driver{}
tikv.MaxConnectionCount = 128
db, err := driver.Open(fmt.Sprintf("tikv://%s?disableGC=true", *pd))
if err != nil {
log.Fatalf("open TiKV storage failed %v", err)
}
for i := 0; i < *concurrent; i++ {
w := newWorker(ctx, db, i)
go func() {
defer wg.Done()
w.Run()
}()
}
if *duration > 0 {
go func() {
time.Sleep(*duration)
cancel()
}()
}
go func() {
sig := <-sc
log.Infof("Got signal [%d] to exit.", sig)
cancel()
}()
wg.Add(1)
go func() {
defer wg.Done()
printStatistics(ctx)
}()
wg.Wait()
}
func printStatistics(ctx context.Context) {
tick := time.NewTicker(time.Second)
defer tick.Stop()
start := time.Now()
lastNow := time.Now()
var lastOpsCount uint64
var lastStats [statsLength]uint64
for i := 0; ; {
select {
case <-ctx.Done():
stats := snapshotStats()
opsCount := stats[writes] + stats[emptyReads] +
stats[nonEmptyReads] + stats[conflictWrites]
elapsed := time.Since(start).Seconds()
fmt.Printf("elapsed______ops/sec__reads/empty/errors___writes/errors____conflict/errors\n")
fmt.Printf("%7s %12.1f %19s %15s %15s\n",
time.Duration(elapsed+0.5)*time.Second,
float64(opsCount)/elapsed,
fmt.Sprintf("%d / %d / %d",
stats[nonEmptyReads],
stats[emptyReads],
stats[readErrors]),
fmt.Sprintf("%d / %d",
stats[writes],
stats[writeErrors]),
fmt.Sprintf("%d / %d",
stats[conflictWrites],
stats[conflictWriteErrors]))
return
case <-tick.C:
now := time.Now()
elapsed := now.Sub(lastNow)
stats := snapshotStats()
opsCount := stats[writes] + stats[emptyReads] +
stats[nonEmptyReads] + stats[conflictWrites]
if i%20 == 0 {
fmt.Printf("elapsed______ops/sec__reads/empty/errors___writes/errors____conflict/errors\n")
}
fmt.Printf("%7s %12.1f %19s %15s %15s\n",
time.Duration(time.Since(start).Seconds()+0.5)*time.Second,
float64(opsCount-lastOpsCount)/elapsed.Seconds(),
fmt.Sprintf("%d / %d / %d",
stats[nonEmptyReads]-lastStats[nonEmptyReads],
stats[emptyReads]-lastStats[emptyReads],
stats[readErrors]-lastStats[readErrors]),
fmt.Sprintf("%d / %d",
stats[writes]-lastStats[writes],
stats[writeErrors]-lastStats[writeErrors]),
fmt.Sprintf("%d / %d",
stats[conflictWrites]-lastStats[conflictWrites],
stats[conflictWriteErrors]-lastStats[conflictWriteErrors]))
lastStats = stats
lastOpsCount = opsCount
lastNow = now
i++
}
}
}
// pushPrometheus pushes metrics to Prometheus Pushgateway.
func pushPrometheus(job, addr string, interval time.Duration) {
for {
err := push.FromGatherer(
job, push.HostnameGroupingKey(),
addr,
prometheus.DefaultGatherer,
)
if err != nil {
log.Errorf("could not push metrics to Prometheus Pushgateway: %v", err)
}
time.Sleep(interval)
}
}
// PushMetrics pushs metircs in background.
func PushMetrics(address string) {
if len(address) == 0 {
log.Info("disable Prometheus push client")
return
}
log.Info("start Prometheus push client")
go pushPrometheus("mvcc", address, 5*time.Second)
}
var (
cmdDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pin",
Subsystem: "mvcc",
Name: "cmd_duration_seconds",
Help: "Duration of commands.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20),
}, []string{"type"})
)
func init() {
prometheus.MustRegister(cmdDuration)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment