Last active
March 6, 2020 06:01
-
-
Save MyonKeminta/bf32114a471ed59c71acd8b232039691 to your computer and use it in GitHub Desktop.
Random Lock Generator for TiKV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"context" | |
"flag" | |
"fmt" | |
"github.com/pingcap/errors" | |
"github.com/pingcap/kvproto/pkg/kvrpcpb" | |
"github.com/pingcap/kvproto/pkg/metapb" | |
pd "github.com/pingcap/pd/client" | |
"github.com/pingcap/tidb/store/tikv" | |
"github.com/pingcap/tidb/store/tikv/oracle" | |
"github.com/pingcap/tidb/store/tikv/tikvrpc" | |
"github.com/pingcap/tidb/util/codec" | |
"math/rand" | |
"sync/atomic" | |
"time" | |
) | |
type LockGenerator struct { | |
pd pd.Client | |
kv tikv.Storage | |
concurrency int | |
lockDensity float64 | |
startedRegions int32 | |
finishedRegions int32 | |
} | |
func NewLockGenerator(pdAddr string, concurrency int, lockDensity float64) (*LockGenerator, error) { | |
pdClient, err := pd.NewClient([]string{pdAddr}, pd.SecurityOption{}) | |
if err != nil { | |
return nil, errors.Annotate(err, "create pd client failed") | |
} | |
driver := tikv.Driver{} | |
store, err := driver.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddr)) | |
if err != nil { | |
return nil, errors.Trace(err) | |
} | |
kv := store.(tikv.Storage) | |
return &LockGenerator{ | |
pd: pdClient, | |
kv: kv, | |
concurrency: concurrency, | |
lockDensity: lockDensity, | |
}, nil | |
} | |
func (g *LockGenerator) Run() error { | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
go func() { | |
ticker := time.NewTicker(time.Minute) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case <-ticker.C: | |
fmt.Printf("Started Regions: %v, Finished Regions: %v\n", | |
atomic.LoadInt32(&g.startedRegions), | |
atomic.LoadInt32(&g.finishedRegions)) | |
} | |
} | |
}() | |
return g.generateLocks(ctx) | |
} | |
func (g *LockGenerator) generateLocks(ctx context.Context) error { | |
fmt.Println("start generating lock") | |
errCh := make(chan error, g.concurrency) | |
key := make([]byte, 0) | |
lastEndKey := make([]byte, 0) | |
currentRunningRegions := 0 | |
for { | |
fmt.Printf("scan region from %+q\n", key) | |
regions, _, err := g.pd.ScanRegions(ctx, key, 256) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
nextScanKey := regions[len(regions)-1].EndKey | |
nextScanKeyCopy := make([]byte, len(nextScanKey)) | |
copy(nextScanKeyCopy, nextScanKey) | |
for _, region := range regions { | |
err = decodeRegionMetaKey(region) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
if len(region.EndKey) > 0 && bytes.Compare(region.EndKey, lastEndKey) <= 0 { | |
fmt.Println("Region skipped") | |
continue | |
} | |
if bytes.Compare(region.StartKey, lastEndKey) < 0 { | |
fmt.Println("Region overlapped") | |
region.StartKey = lastEndKey | |
} | |
lastEndKey = region.EndKey | |
for currentRunningRegions >= g.concurrency { | |
select { | |
case <-ctx.Done(): | |
return nil | |
case err = <-errCh: | |
if err != nil { | |
return errors.Trace(err) | |
} | |
currentRunningRegions-- | |
atomic.AddInt32(&g.finishedRegions, 1) | |
} | |
} | |
atomic.AddInt32(&g.startedRegions, 1) | |
currentRunningRegions++ | |
go func() { errCh <- g.lockRange(ctx, region.StartKey, region.EndKey) }() | |
} | |
key = nextScanKeyCopy | |
if len(key) == 0 { | |
break | |
} | |
} | |
for currentRunningRegions >= g.concurrency { | |
select { | |
case <-ctx.Done(): | |
return nil | |
case err := <-errCh: | |
if err != nil { | |
return errors.Trace(err) | |
} | |
currentRunningRegions-- | |
atomic.AddInt32(&g.finishedRegions, 1) | |
} | |
} | |
return nil | |
} | |
func (g *LockGenerator) lockRange(ctx context.Context, startKey, endKey []byte) error { | |
//fmt.Printf("lock range %+q, %+q\n", startKey, endKey) | |
ts, err := g.kv.CurrentVersion() | |
if err != nil { | |
return errors.Trace(err) | |
} | |
snap, err := g.kv.GetSnapshot(ts) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
iter, err := snap.Iter(startKey, endKey) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
const maxTxnSize = 1000 | |
batch := make([][]byte, 0, maxTxnSize) | |
for iter.Valid() { | |
lockThis := rand.Float64() < g.lockDensity | |
if lockThis { | |
batch = append(batch, iter.Key()) | |
if len(batch) >= maxTxnSize { | |
err = g.lockKeys(ctx, batch) | |
if err != nil { | |
return err | |
} | |
batch = batch[:0] | |
} | |
} | |
err := iter.Next() | |
if err != nil { | |
return errors.Trace(err) | |
} | |
} | |
if len(batch) > 0 { | |
err = g.lockKeys(ctx, batch) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
} | |
return nil | |
} | |
func (g *LockGenerator) lockKeys(ctx context.Context, keys [][]byte) error { | |
primary := keys[0] | |
for len(keys) > 0 { | |
lockedKeys, err := g.lockBatch(ctx, keys, primary) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
keys = keys[lockedKeys:] | |
} | |
return nil | |
} | |
func (g *LockGenerator) lockBatch(ctx context.Context, keys [][]byte, primary []byte) (int, error) { | |
const maxBatchSize = 16 * 1024 | |
// TiKV client doesn't expose Prewrite interface directly. We need to manually locate the region and send the | |
// Prewrite requests. | |
for { | |
bo := tikv.NewBackoffer(ctx, 20000) | |
loc, err := g.kv.GetRegionCache().LocateKey(bo, keys[0]) | |
if err != nil { | |
return 0, errors.Trace(err) | |
} | |
// Get a timestamp to use as the startTs | |
startTs, err := g.getTs(ctx) | |
if err != nil { | |
return 0, errors.Trace(err) | |
} | |
// Pick a batch of keys and make up the mutations | |
var mutations []*kvrpcpb.Mutation | |
batchSize := 0 | |
for _, key := range keys { | |
if len(loc.EndKey) > 0 && bytes.Compare(key, loc.EndKey) >= 0 { | |
break | |
} | |
if bytes.Compare(key, loc.StartKey) < 0 { | |
break | |
} | |
value := randStr() | |
mutations = append(mutations, &kvrpcpb.Mutation{ | |
Op: kvrpcpb.Op_Put, | |
Key: key, | |
Value: []byte(value), | |
}) | |
batchSize += len(key) + len(value) | |
if batchSize >= maxBatchSize { | |
break | |
} | |
} | |
lockedKeys := len(mutations) | |
if lockedKeys == 0 { | |
return 0, nil | |
} | |
req := &tikvrpc.Request{ | |
Type: tikvrpc.CmdPrewrite, | |
Prewrite: &kvrpcpb.PrewriteRequest{ | |
Mutations: mutations, | |
PrimaryLock: primary, | |
StartVersion: startTs, | |
LockTtl: 1000 * 60, | |
}, | |
} | |
// Send the requests | |
resp, err := g.kv.SendReq(bo, req, loc.Region, time.Second*20) | |
if err != nil { | |
return 0, errors.Annotatef(err, "send request failed. region: %+v [%+q, %+q), keys: %+q", loc.Region, loc.StartKey, loc.EndKey, keys[0:lockedKeys]) | |
} | |
regionErr, err := resp.GetRegionError() | |
if err != nil { | |
return 0, errors.Trace(err) | |
} | |
if regionErr != nil { | |
err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) | |
if err != nil { | |
return 0, errors.Trace(err) | |
} | |
continue | |
} | |
prewriteResp := resp.Prewrite | |
if prewriteResp == nil { | |
return 0, errors.Errorf("response body missing") | |
} | |
// Ignore key errors since we never commit the transaction and we don't need to keep consistency here. | |
return lockedKeys, nil | |
} | |
} | |
func (g *LockGenerator) getTs(ctx context.Context) (uint64, error) { | |
physical, logical, err := g.pd.GetTS(ctx) | |
if err != nil { | |
return 0, errors.Trace(err) | |
} | |
ts := oracle.ComposeTS(physical, logical) | |
return ts, nil | |
} | |
func randStr() string { | |
length := rand.Intn(128) | |
res := "" | |
for i := 0; i < length; i++ { | |
res += string('a' + (rand.Int() % 26)) | |
} | |
return res | |
} | |
func decodeRegionMetaKey(r *metapb.Region) error { | |
if len(r.StartKey) != 0 { | |
_, decoded, err := codec.DecodeBytes(r.StartKey, nil) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
r.StartKey = decoded | |
} | |
if len(r.EndKey) != 0 { | |
_, decoded, err := codec.DecodeBytes(r.EndKey, nil) | |
if err != nil { | |
return errors.Trace(err) | |
} | |
r.EndKey = decoded | |
} | |
return nil | |
} | |
var ( | |
pdAddr = flag.String("addr", "127.0.0.1:2379", "pd addr") | |
concurrency = flag.Int("concurrency", 32, "concurrency. default: 32") | |
lockDensity = flag.Float64("lock-density", 0.05, "Density of locks to generate. default: 0.05") | |
) | |
func main() { | |
flag.Parse() | |
generator, err := NewLockGenerator(*pdAddr, *concurrency, *lockDensity) | |
if err != nil { | |
panic(err) | |
} | |
err = generator.Run() | |
if err != nil { | |
panic(err) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment