Last active
January 19, 2018 02:50
-
-
Save siddontang/6c2e7656460b37723a487bd6da68d6db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/binary" | |
"flag" | |
"fmt" | |
"math/rand" | |
"net/http" | |
_ "net/http/pprof" | |
"os" | |
"os/signal" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
"github.com/pingcap/tidb/kv" | |
"github.com/pingcap/tidb/store/tikv" | |
"github.com/prometheus/client_golang/prometheus" | |
"github.com/prometheus/client_golang/prometheus/push" | |
log "github.com/sirupsen/logrus" | |
) | |
var ( | |
pd = flag.String("pd", "127.0.0.1:2379", "PD endpoints") | |
statusAddr = flag.String("status", ":10081", "Status listening address") | |
pushAddr = flag.String("push", "", "Prometheus push address") | |
concurrent = flag.Int("c", 200, "Number of concurrent workers") | |
duration = flag.Duration("d", 0, "Run duration. If 0, run forever") | |
// For case | |
rowNumber = flag.Int("row", 10000000, "Number of rows") | |
colNumber = flag.Int("col", 20, "Number of columns a row has") | |
colSize = flag.Int("size", 1024, "Data size of a column") | |
readRate = flag.Float64("read-rate", 0.8, "Read rate") | |
conflictRate = flag.Float64("conflict-rate", 0.008, "Write conflict rate") | |
) | |
type statistic int | |
const ( | |
nonEmptyReads statistic = iota | |
emptyReads | |
writes | |
writeErrors | |
readErrors | |
conflictWrites | |
conflictWriteErrors | |
statsLength | |
) | |
var globalStats [statsLength]uint64 | |
func snapshotStats() (s [statsLength]uint64) { | |
for i := 0; i < int(statsLength); i++ { | |
s[i] = atomic.LoadUint64(&globalStats[i]) | |
} | |
return s | |
} | |
// Worker executes the operations | |
type Worker struct { | |
db kv.Storage | |
r *rand.Rand | |
ctx context.Context | |
cancel context.CancelFunc | |
offsetRow int | |
rowCount int | |
// Cache sometine to avoid frequent allocation | |
colKeys [][]byte | |
colData []byte | |
} | |
// Read reads a row | |
func (w *Worker) Read() { | |
start := time.Now() | |
defer func() { cmdDuration.WithLabelValues("read").Observe(time.Since(start).Seconds()) }() | |
empty, err := w.read() | |
if err != nil { | |
atomic.AddUint64(&globalStats[readErrors], 1) | |
return | |
} | |
if !empty { | |
atomic.AddUint64(&globalStats[nonEmptyReads], 1) | |
return | |
} | |
atomic.AddUint64(&globalStats[emptyReads], 1) | |
return | |
} | |
// Write writes a row | |
func (w *Worker) Write() { | |
start := time.Now() | |
defer func() { cmdDuration.WithLabelValues("write").Observe(time.Since(start).Seconds()) }() | |
if err := w.write(false); err != nil { | |
atomic.AddUint64(&globalStats[writeErrors], 1) | |
return | |
} | |
atomic.AddUint64(&globalStats[writes], 1) | |
return | |
} | |
// ConflictWrite writes row 0 | |
func (w *Worker) ConflictWrite() { | |
start := time.Now() | |
defer func() { cmdDuration.WithLabelValues("conflict_write").Observe(time.Since(start).Seconds()) }() | |
if err := w.write(true); err != nil { | |
atomic.AddUint64(&globalStats[conflictWriteErrors], 1) | |
return | |
} | |
atomic.AddUint64(&globalStats[conflictWrites], 1) | |
return | |
} | |
func (w *Worker) buildKeys(useConflict bool) { | |
id := 0 | |
if !useConflict { | |
id = w.r.Intn(w.rowCount) + w.offsetRow | |
} | |
for i := 0; i < len(w.colKeys); i++ { | |
binary.BigEndian.PutUint64(w.colKeys[i], uint64(id)) | |
} | |
} | |
func (w *Worker) read() (bool, error) { | |
txn, err := w.db.Begin() | |
if err != nil { | |
return false, err | |
} | |
defer txn.Rollback() | |
w.buildKeys(false) | |
empty := false | |
for _, key := range w.colKeys { | |
data, err := txn.Get(key) | |
if err != nil && !kv.IsErrNotFound(err) { | |
return false, err | |
} | |
if data == nil { | |
empty = true | |
} | |
} | |
if err := txn.Commit(w.ctx); err != nil { | |
return false, err | |
} | |
return empty, nil | |
} | |
func (w *Worker) write(useConflict bool) error { | |
txn, err := w.db.Begin() | |
if err != nil { | |
return err | |
} | |
defer txn.Rollback() | |
w.buildKeys(useConflict) | |
for _, key := range w.colKeys { | |
if err := txn.Set(key, w.colData); err != nil { | |
return err | |
} | |
} | |
return txn.Commit(w.ctx) | |
} | |
// Run runs the worker | |
func (w *Worker) Run() { | |
defer w.cancel() | |
for { | |
select { | |
case <-w.ctx.Done(): | |
return | |
default: | |
if w.chooseRead() { | |
w.Read() | |
} else { | |
if w.useConflict() { | |
w.ConflictWrite() | |
} else { | |
w.Write() | |
} | |
} | |
} | |
} | |
} | |
func (w *Worker) chooseRead() bool { | |
if *readRate == 0 { | |
return false | |
} | |
rate := w.r.Float64() | |
return rate <= *readRate | |
} | |
func (w *Worker) useConflict() bool { | |
rate := w.r.Float64() | |
return rate <= *conflictRate | |
} | |
var letters = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") | |
func (w *Worker) randData(length int) []byte { | |
buf := make([]byte, length) | |
for i := range buf { | |
buf[i] = letters[w.r.Intn(len(letters))] | |
} | |
return buf | |
} | |
func newWorker(ctx context.Context, db kv.Storage, index int) *Worker { | |
step := *rowNumber / *concurrent | |
source := rand.NewSource(int64(time.Now().UnixNano())) | |
subCtx, subCancel := context.WithCancel(ctx) | |
w := &Worker{ | |
db: db, | |
r: rand.New(source), | |
ctx: subCtx, | |
cancel: subCancel, | |
offsetRow: index * step, | |
rowCount: step, | |
colKeys: make([][]byte, *colNumber), | |
} | |
w.colData = w.randData(*colSize) | |
for i := 0; i < len(w.colKeys); i++ { | |
w.colKeys[i] = make([]byte, 8+2) | |
binary.BigEndian.PutUint16(w.colKeys[i][0:8], 0) | |
binary.BigEndian.PutUint16(w.colKeys[i][8:], uint16(i)) | |
} | |
return w | |
} | |
func main() { | |
flag.Parse() | |
go func() { | |
http.Handle("/metrics", prometheus.Handler()) | |
http.ListenAndServe(*statusAddr, nil) | |
}() | |
PushMetrics(*pushAddr) | |
ctx, cancel := context.WithCancel(context.Background()) | |
sc := make(chan os.Signal, 1) | |
signal.Notify(sc, | |
syscall.SIGHUP, | |
syscall.SIGINT, | |
syscall.SIGTERM, | |
syscall.SIGQUIT) | |
var wg sync.WaitGroup | |
wg.Add(*concurrent) | |
driver := tikv.Driver{} | |
tikv.MaxConnectionCount = 128 | |
db, err := driver.Open(fmt.Sprintf("tikv://%s?disableGC=true", *pd)) | |
if err != nil { | |
log.Fatalf("open TiKV storage failed %v", err) | |
} | |
for i := 0; i < *concurrent; i++ { | |
w := newWorker(ctx, db, i) | |
go func() { | |
defer wg.Done() | |
w.Run() | |
}() | |
} | |
if *duration > 0 { | |
go func() { | |
time.Sleep(*duration) | |
cancel() | |
}() | |
} | |
go func() { | |
sig := <-sc | |
log.Infof("Got signal [%d] to exit.", sig) | |
cancel() | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
printStatistics(ctx) | |
}() | |
wg.Wait() | |
} | |
func printStatistics(ctx context.Context) { | |
tick := time.NewTicker(time.Second) | |
defer tick.Stop() | |
start := time.Now() | |
lastNow := time.Now() | |
var lastOpsCount uint64 | |
var lastStats [statsLength]uint64 | |
for i := 0; ; { | |
select { | |
case <-ctx.Done(): | |
stats := snapshotStats() | |
opsCount := stats[writes] + stats[emptyReads] + | |
stats[nonEmptyReads] + stats[conflictWrites] | |
elapsed := time.Since(start).Seconds() | |
fmt.Printf("elapsed______ops/sec__reads/empty/errors___writes/errors____conflict/errors\n") | |
fmt.Printf("%7s %12.1f %19s %15s %15s\n", | |
time.Duration(elapsed+0.5)*time.Second, | |
float64(opsCount)/elapsed, | |
fmt.Sprintf("%d / %d / %d", | |
stats[nonEmptyReads], | |
stats[emptyReads], | |
stats[readErrors]), | |
fmt.Sprintf("%d / %d", | |
stats[writes], | |
stats[writeErrors]), | |
fmt.Sprintf("%d / %d", | |
stats[conflictWrites], | |
stats[conflictWriteErrors])) | |
return | |
case <-tick.C: | |
now := time.Now() | |
elapsed := now.Sub(lastNow) | |
stats := snapshotStats() | |
opsCount := stats[writes] + stats[emptyReads] + | |
stats[nonEmptyReads] + stats[conflictWrites] | |
if i%20 == 0 { | |
fmt.Printf("elapsed______ops/sec__reads/empty/errors___writes/errors____conflict/errors\n") | |
} | |
fmt.Printf("%7s %12.1f %19s %15s %15s\n", | |
time.Duration(time.Since(start).Seconds()+0.5)*time.Second, | |
float64(opsCount-lastOpsCount)/elapsed.Seconds(), | |
fmt.Sprintf("%d / %d / %d", | |
stats[nonEmptyReads]-lastStats[nonEmptyReads], | |
stats[emptyReads]-lastStats[emptyReads], | |
stats[readErrors]-lastStats[readErrors]), | |
fmt.Sprintf("%d / %d", | |
stats[writes]-lastStats[writes], | |
stats[writeErrors]-lastStats[writeErrors]), | |
fmt.Sprintf("%d / %d", | |
stats[conflictWrites]-lastStats[conflictWrites], | |
stats[conflictWriteErrors]-lastStats[conflictWriteErrors])) | |
lastStats = stats | |
lastOpsCount = opsCount | |
lastNow = now | |
i++ | |
} | |
} | |
} | |
// pushPrometheus pushes metrics to Prometheus Pushgateway. | |
func pushPrometheus(job, addr string, interval time.Duration) { | |
for { | |
err := push.FromGatherer( | |
job, push.HostnameGroupingKey(), | |
addr, | |
prometheus.DefaultGatherer, | |
) | |
if err != nil { | |
log.Errorf("could not push metrics to Prometheus Pushgateway: %v", err) | |
} | |
time.Sleep(interval) | |
} | |
} | |
// PushMetrics pushs metircs in background. | |
func PushMetrics(address string) { | |
if len(address) == 0 { | |
log.Info("disable Prometheus push client") | |
return | |
} | |
log.Info("start Prometheus push client") | |
go pushPrometheus("mvcc", address, 5*time.Second) | |
} | |
var ( | |
cmdDuration = prometheus.NewHistogramVec( | |
prometheus.HistogramOpts{ | |
Namespace: "pin", | |
Subsystem: "mvcc", | |
Name: "cmd_duration_seconds", | |
Help: "Duration of commands.", | |
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 20), | |
}, []string{"type"}) | |
) | |
func init() { | |
prometheus.MustRegister(cmdDuration) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment