Skip to content

Instantly share code, notes, and snippets.

@MyonKeminta
Last active March 6, 2020 06:01
Show Gist options
  • Save MyonKeminta/bf32114a471ed59c71acd8b232039691 to your computer and use it in GitHub Desktop.
Save MyonKeminta/bf32114a471ed59c71acd8b232039691 to your computer and use it in GitHub Desktop.
Random Lock Generator for TiKV
package main
import (
"bytes"
"context"
"flag"
"fmt"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/codec"
"math/rand"
"sync/atomic"
"time"
)
type LockGenerator struct {
pd pd.Client
kv tikv.Storage
concurrency int
lockDensity float64
startedRegions int32
finishedRegions int32
}
func NewLockGenerator(pdAddr string, concurrency int, lockDensity float64) (*LockGenerator, error) {
pdClient, err := pd.NewClient([]string{pdAddr}, pd.SecurityOption{})
if err != nil {
return nil, errors.Annotate(err, "create pd client failed")
}
driver := tikv.Driver{}
store, err := driver.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddr))
if err != nil {
return nil, errors.Trace(err)
}
kv := store.(tikv.Storage)
return &LockGenerator{
pd: pdClient,
kv: kv,
concurrency: concurrency,
lockDensity: lockDensity,
}, nil
}
func (g *LockGenerator) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Printf("Started Regions: %v, Finished Regions: %v\n",
atomic.LoadInt32(&g.startedRegions),
atomic.LoadInt32(&g.finishedRegions))
}
}
}()
return g.generateLocks(ctx)
}
func (g *LockGenerator) generateLocks(ctx context.Context) error {
fmt.Println("start generating lock")
errCh := make(chan error, g.concurrency)
key := make([]byte, 0)
lastEndKey := make([]byte, 0)
currentRunningRegions := 0
for {
fmt.Printf("scan region from %+q\n", key)
regions, _, err := g.pd.ScanRegions(ctx, key, 256)
if err != nil {
return errors.Trace(err)
}
nextScanKey := regions[len(regions)-1].EndKey
nextScanKeyCopy := make([]byte, len(nextScanKey))
copy(nextScanKeyCopy, nextScanKey)
for _, region := range regions {
err = decodeRegionMetaKey(region)
if err != nil {
return errors.Trace(err)
}
if len(region.EndKey) > 0 && bytes.Compare(region.EndKey, lastEndKey) <= 0 {
fmt.Println("Region skipped")
continue
}
if bytes.Compare(region.StartKey, lastEndKey) < 0 {
fmt.Println("Region overlapped")
region.StartKey = lastEndKey
}
lastEndKey = region.EndKey
for currentRunningRegions >= g.concurrency {
select {
case <-ctx.Done():
return nil
case err = <-errCh:
if err != nil {
return errors.Trace(err)
}
currentRunningRegions--
atomic.AddInt32(&g.finishedRegions, 1)
}
}
atomic.AddInt32(&g.startedRegions, 1)
currentRunningRegions++
go func() { errCh <- g.lockRange(ctx, region.StartKey, region.EndKey) }()
}
key = nextScanKeyCopy
if len(key) == 0 {
break
}
}
for currentRunningRegions >= g.concurrency {
select {
case <-ctx.Done():
return nil
case err := <-errCh:
if err != nil {
return errors.Trace(err)
}
currentRunningRegions--
atomic.AddInt32(&g.finishedRegions, 1)
}
}
return nil
}
func (g *LockGenerator) lockRange(ctx context.Context, startKey, endKey []byte) error {
//fmt.Printf("lock range %+q, %+q\n", startKey, endKey)
ts, err := g.kv.CurrentVersion()
if err != nil {
return errors.Trace(err)
}
snap, err := g.kv.GetSnapshot(ts)
if err != nil {
return errors.Trace(err)
}
iter, err := snap.Iter(startKey, endKey)
if err != nil {
return errors.Trace(err)
}
const maxTxnSize = 1000
batch := make([][]byte, 0, maxTxnSize)
for iter.Valid() {
lockThis := rand.Float64() < g.lockDensity
if lockThis {
batch = append(batch, iter.Key())
if len(batch) >= maxTxnSize {
err = g.lockKeys(ctx, batch)
if err != nil {
return err
}
batch = batch[:0]
}
}
err := iter.Next()
if err != nil {
return errors.Trace(err)
}
}
if len(batch) > 0 {
err = g.lockKeys(ctx, batch)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
func (g *LockGenerator) lockKeys(ctx context.Context, keys [][]byte) error {
primary := keys[0]
for len(keys) > 0 {
lockedKeys, err := g.lockBatch(ctx, keys, primary)
if err != nil {
return errors.Trace(err)
}
keys = keys[lockedKeys:]
}
return nil
}
func (g *LockGenerator) lockBatch(ctx context.Context, keys [][]byte, primary []byte) (int, error) {
const maxBatchSize = 16 * 1024
// TiKV client doesn't expose Prewrite interface directly. We need to manually locate the region and send the
// Prewrite requests.
for {
bo := tikv.NewBackoffer(ctx, 20000)
loc, err := g.kv.GetRegionCache().LocateKey(bo, keys[0])
if err != nil {
return 0, errors.Trace(err)
}
// Get a timestamp to use as the startTs
startTs, err := g.getTs(ctx)
if err != nil {
return 0, errors.Trace(err)
}
// Pick a batch of keys and make up the mutations
var mutations []*kvrpcpb.Mutation
batchSize := 0
for _, key := range keys {
if len(loc.EndKey) > 0 && bytes.Compare(key, loc.EndKey) >= 0 {
break
}
if bytes.Compare(key, loc.StartKey) < 0 {
break
}
value := randStr()
mutations = append(mutations, &kvrpcpb.Mutation{
Op: kvrpcpb.Op_Put,
Key: key,
Value: []byte(value),
})
batchSize += len(key) + len(value)
if batchSize >= maxBatchSize {
break
}
}
lockedKeys := len(mutations)
if lockedKeys == 0 {
return 0, nil
}
req := &tikvrpc.Request{
Type: tikvrpc.CmdPrewrite,
Prewrite: &kvrpcpb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: primary,
StartVersion: startTs,
LockTtl: 1000 * 60,
},
}
// Send the requests
resp, err := g.kv.SendReq(bo, req, loc.Region, time.Second*20)
if err != nil {
return 0, errors.Annotatef(err, "send request failed. region: %+v [%+q, %+q), keys: %+q", loc.Region, loc.StartKey, loc.EndKey, keys[0:lockedKeys])
}
regionErr, err := resp.GetRegionError()
if err != nil {
return 0, errors.Trace(err)
}
if regionErr != nil {
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return 0, errors.Trace(err)
}
continue
}
prewriteResp := resp.Prewrite
if prewriteResp == nil {
return 0, errors.Errorf("response body missing")
}
// Ignore key errors since we never commit the transaction and we don't need to keep consistency here.
return lockedKeys, nil
}
}
func (g *LockGenerator) getTs(ctx context.Context) (uint64, error) {
physical, logical, err := g.pd.GetTS(ctx)
if err != nil {
return 0, errors.Trace(err)
}
ts := oracle.ComposeTS(physical, logical)
return ts, nil
}
func randStr() string {
length := rand.Intn(128)
res := ""
for i := 0; i < length; i++ {
res += string('a' + (rand.Int() % 26))
}
return res
}
func decodeRegionMetaKey(r *metapb.Region) error {
if len(r.StartKey) != 0 {
_, decoded, err := codec.DecodeBytes(r.StartKey, nil)
if err != nil {
return errors.Trace(err)
}
r.StartKey = decoded
}
if len(r.EndKey) != 0 {
_, decoded, err := codec.DecodeBytes(r.EndKey, nil)
if err != nil {
return errors.Trace(err)
}
r.EndKey = decoded
}
return nil
}
var (
pdAddr = flag.String("addr", "127.0.0.1:2379", "pd addr")
concurrency = flag.Int("concurrency", 32, "concurrency. default: 32")
lockDensity = flag.Float64("lock-density", 0.05, "Density of locks to generate. default: 0.05")
)
func main() {
flag.Parse()
generator, err := NewLockGenerator(*pdAddr, *concurrency, *lockDensity)
if err != nil {
panic(err)
}
err = generator.Run()
if err != nil {
panic(err)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment