Skip to content

Instantly share code, notes, and snippets.

@MyonKeminta
Last active April 7, 2020 05:37
Show Gist options
  • Save MyonKeminta/6a9d60111bbe2f6f96b091ac474d2fc4 to your computer and use it in GitHub Desktop.
Save MyonKeminta/6a9d60111bbe2f6f96b091ac474d2fc4 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"encoding/binary"
"encoding/hex"
"flag"
"fmt"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/common/log"
"hash/crc64"
"math/rand"
"time"
)
// * Rand Gen
// * Scan checksum
// * Scan print
// * Scan diff
//
// Ref TIDB: https://github.com/pingcap/tidb/commit/7a7b2d82a70858544d7b711c9cb8499bda5d6eb7
var (
pdAddr = flag.String("pd-addr", "127.0.0.1:2379", "Address of PD")
runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan' and 'diff'")
startKeyStr = flag.String("start-key", "", "Start key in hex")
endKeyStr = flag.String("end-key", "", "End key in hex")
keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode")
diffPdAddr = flag.String("diff-pd-addr", "127.0.0.1:2380", "PD address of another client to diff data against the current client. Only for diff mode.")
concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen")
targetCf = flag.String("cf", "", "TiKV's CF")
maxValueLen = flag.Int("max-value-len", 512, "Maximum length of random generated values")
keyEndWithTimestamp = flag.Bool("key-end-with-ts", false, "Whether the random generated keys should end with the current timestamp")
)
func createClient(addr string) (*tikv.RawKVClient, error) {
cli, err := tikv.NewRawKVClient([]string{addr}, config.Security{})
return cli, err
}
func main() {
flag.Parse()
startKey, err := hex.DecodeString(*startKeyStr)
if err != nil {
log.Fatalf("Invalid startKey: %v, err: %+v", startKeyStr, err)
}
endKey, err := hex.DecodeString(*endKeyStr)
if err != nil {
log.Fatalf("Invalid endKey: %v, err: %+v", endKeyStr, err)
}
if len(endKey) == 0 {
log.Fatal("Empty endKey is not supported yet")
}
if *runMode == "test-rand-key" {
testRandKey(startKey, endKey, *keyMaxLen)
return
}
client, err := createClient(*pdAddr)
if err != nil {
log.Fatalf("Failed to create client to %v, err: %+v", *pdAddr, err)
}
switch *runMode {
case "rand-gen":
err = randGen(client, startKey, endKey, *targetCf, *keyMaxLen, *concurrency)
case "checksum":
err = checksum(client, startKey, endKey, *targetCf)
case "scan":
err = scan(client, startKey, endKey, *targetCf)
case "diff":
diffClient, err := createClient(*diffPdAddr)
if err != nil {
log.Fatalf("Failed to create diff client to %v, err: %+v", *diffPdAddr, err)
}
err = diff(client, diffClient, startKey, endKey, *targetCf)
}
if err != nil {
log.Fatalf("Error: %+v", err)
}
}
func randGen(client *tikv.RawKVClient, startKey, endKey []byte, cf string, maxLen int, concurrency int) error {
log.Infof("Start rand-gen from %v to %v, maxLen %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey), maxLen)
log.Infof("Rand-gen will keep running. Please Ctrl+C to stop manually.")
// Cannot generate shorter key than commonPrefix
commonPrefixLen := 0
for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) && startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ {
continue
}
if maxLen < commonPrefixLen {
return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen)
}
const batchSize = 32
errCh := make(chan error, concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for {
keys := make([][]byte, 0, batchSize)
values := make([][]byte, 0, batchSize)
for i := 0; i < batchSize; i++ {
key := randKey(startKey, endKey, maxLen)
if *keyEndWithTimestamp {
replaceEndWithTs(key)
}
keys = append(keys, key)
value := randValue()
values = append(values, value)
}
err := client.BatchPutCF(keys, values, cf)
if err != nil {
errCh <- errors.Trace(err)
}
}
}()
}
err := <-errCh
if err != nil {
return errors.Trace(err)
}
return nil
}
func replaceEndWithTs(key []byte) {
if len(key) < 8 {
return
}
ts := time.Now().Unix()
binary.BigEndian.PutUint64(key[len(key)-8:], uint64(ts))
}
func testRandKey(startKey, endKey []byte, maxLen int) {
for {
k := randKey(startKey, endKey, maxLen)
if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 {
panic(hex.EncodeToString(k))
}
}
}
func randKey(startKey, endKey []byte, maxLen int) []byte {
Retry:
for { // Regenerate on fail
result := make([]byte, 0, maxLen)
upperUnbounded := false
lowerUnbounded := false
for i := 0; i < maxLen; i++ {
upperBound := 256
if !upperUnbounded {
if i >= len(endKey) {
// The generated key is the same as endKey which is invalid. Regenerate it.
continue Retry
}
upperBound = int(endKey[i]) + 1
}
lowerBound := 0
if !lowerUnbounded {
if i >= len(startKey) {
lowerUnbounded = true
} else {
lowerBound = int(startKey[i])
}
}
if lowerUnbounded {
if rand.Intn(257) == 0 {
return result
}
}
value := rand.Intn(upperBound - lowerBound)
value += lowerBound
if value < upperBound-1 {
upperUnbounded = true
}
if value > lowerBound {
lowerUnbounded = true
}
result = append(result, uint8(value))
}
return result
}
}
func randValue() []byte {
result := make([]byte, 0, *maxValueLen)
for i := 0; i < *maxValueLen; i++ {
value := rand.Intn(257)
if value == 256 {
if i > 0 {
return result
}
value--
}
result = append(result, uint8(value))
}
return result
}
func checksum(client *tikv.RawKVClient, startKey, endKey []byte, cf string) error {
log.Infof("Start checkcum on range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey))
scanner := newRawKVScanner(client, startKey, endKey, cf)
digest := crc64.New(crc64.MakeTable(crc64.ECMA))
var res uint64
for {
k, v, err := scanner.Next()
if err != nil {
return errors.Trace(err)
}
if len(k) == 0 {
break
}
_, _ = digest.Write(k)
_, _ = digest.Write(v)
res ^= digest.Sum64()
}
log.Infof("Checksum result: %016x", res)
return nil
}
func scan(client *tikv.RawKVClient, startKey, endKey []byte, cf string) error {
log.Infof("Start scanning data in range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey))
scanner := newRawKVScanner(client, startKey, endKey, cf)
var key []byte
for {
k, v, err := scanner.Next()
if err != nil {
return errors.Trace(err)
}
if len(k) == 0 {
break
}
fmt.Printf("key: %v, value: %v\n", hex.EncodeToString(k), hex.EncodeToString(v))
if bytes.Compare(key, k) >= 0 {
log.Errorf("Scan result is not in order. Previous key: %v, Current key: %v", hex.EncodeToString(key), hex.EncodeToString(k))
}
}
log.Infof("Finished Scanning.")
return nil
}
func diff(client *tikv.RawKVClient, diffClient *tikv.RawKVClient, startKey, endKey []byte, cf string) error {
panic("unimplemented")
}
const defaultScanBatchSize = 128
type rawKVScanner struct {
client *tikv.RawKVClient
batchSize int
currentKey []byte
endKey []byte
cf string
bufferKeys [][]byte
bufferValues [][]byte
bufferCursor int
noMore bool
}
func newRawKVScanner(client *tikv.RawKVClient, startKey, endKey []byte, cf string) *rawKVScanner {
return &rawKVScanner{
client: client,
batchSize: defaultScanBatchSize,
currentKey: startKey,
endKey: endKey,
cf: cf,
noMore: false,
}
}
func (s *rawKVScanner) Next() ([]byte, []byte, error) {
if s.bufferCursor >= len(s.bufferKeys) {
if s.noMore {
return nil, nil, nil
}
s.bufferCursor = 0
batchSize := s.batchSize
var err error
s.bufferKeys, s.bufferValues, err = s.client.ScanCF(s.currentKey, s.endKey, batchSize, s.cf)
if err != nil {
return nil, nil, errors.Trace(err)
}
if len(s.bufferKeys) < batchSize {
s.noMore = true
}
if len(s.bufferKeys) == 0 {
return nil, nil, nil
}
s.currentKey = append(s.bufferKeys[len(s.bufferKeys)-1], 0)
}
key := s.bufferKeys[s.bufferCursor]
value := s.bufferValues[s.bufferCursor]
s.bufferCursor++
return key, value, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment