Last active
April 7, 2020 05:37
-
-
Save MyonKeminta/6a9d60111bbe2f6f96b091ac474d2fc4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"encoding/binary" | |
"encoding/hex" | |
"flag" | |
"fmt" | |
"github.com/pingcap/errors" | |
"github.com/pingcap/tidb/config" | |
"github.com/pingcap/tidb/store/tikv" | |
"github.com/prometheus/common/log" | |
"hash/crc64" | |
"math/rand" | |
"time" | |
) | |
// * Rand Gen | |
// * Scan checksum | |
// * Scan print | |
// * Scan diff | |
// | |
// Ref TIDB: https://github.com/pingcap/tidb/commit/7a7b2d82a70858544d7b711c9cb8499bda5d6eb7 | |
var ( | |
pdAddr = flag.String("pd-addr", "127.0.0.1:2379", "Address of PD") | |
runMode = flag.String("mode", "", "Mode. One of 'rand-gen', 'checksum', 'scan' and 'diff'") | |
startKeyStr = flag.String("start-key", "", "Start key in hex") | |
endKeyStr = flag.String("end-key", "", "End key in hex") | |
keyMaxLen = flag.Int("key-max-len", 32, "Max length of keys for rand-gen mode") | |
diffPdAddr = flag.String("diff-pd-addr", "127.0.0.1:2380", "PD address of another client to diff data against the current client. Only for diff mode.") | |
concurrency = flag.Int("concurrency", 32, "Concurrency to run rand-gen") | |
targetCf = flag.String("cf", "", "TiKV's CF") | |
maxValueLen = flag.Int("max-value-len", 512, "Maximum length of random generated values") | |
keyEndWithTimestamp = flag.Bool("key-end-with-ts", false, "Whether the random generated keys should end with the current timestamp") | |
) | |
func createClient(addr string) (*tikv.RawKVClient, error) { | |
cli, err := tikv.NewRawKVClient([]string{addr}, config.Security{}) | |
return cli, err | |
} | |
func main() { | |
flag.Parse() | |
startKey, err := hex.DecodeString(*startKeyStr) | |
if err != nil { | |
log.Fatalf("Invalid startKey: %v, err: %+v", startKeyStr, err) | |
} | |
endKey, err := hex.DecodeString(*endKeyStr) | |
if err != nil { | |
log.Fatalf("Invalid endKey: %v, err: %+v", endKeyStr, err) | |
} | |
if len(endKey) == 0 { | |
log.Fatal("Empty endKey is not supported yet") | |
} | |
if *runMode == "test-rand-key" { | |
testRandKey(startKey, endKey, *keyMaxLen) | |
return | |
} | |
client, err := createClient(*pdAddr) | |
if err != nil { | |
log.Fatalf("Failed to create client to %v, err: %+v", *pdAddr, err) | |
} | |
switch *runMode { | |
case "rand-gen": | |
err = randGen(client, startKey, endKey, *targetCf, *keyMaxLen, *concurrency) | |
case "checksum": | |
err = checksum(client, startKey, endKey, *targetCf) | |
case "scan": | |
err = scan(client, startKey, endKey, *targetCf) | |
case "diff": | |
diffClient, err := createClient(*diffPdAddr) | |
if err != nil { | |
log.Fatalf("Failed to create diff client to %v, err: %+v", *diffPdAddr, err) | |
} | |
err = diff(client, diffClient, startKey, endKey, *targetCf) | |
} | |
if err != nil { | |
log.Fatalf("Error: %+v", err) | |
} | |
} | |
func randGen(client *tikv.RawKVClient, startKey, endKey []byte, cf string, maxLen int, concurrency int) error { | |
log.Infof("Start rand-gen from %v to %v, maxLen %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey), maxLen) | |
log.Infof("Rand-gen will keep running. Please Ctrl+C to stop manually.") | |
// Cannot generate shorter key than commonPrefix | |
commonPrefixLen := 0 | |
for ; commonPrefixLen < len(startKey) && commonPrefixLen < len(endKey) && startKey[commonPrefixLen] == endKey[commonPrefixLen]; commonPrefixLen++ { | |
continue | |
} | |
if maxLen < commonPrefixLen { | |
return errors.Errorf("maxLen (%v) < commonPrefixLen (%v)", maxLen, commonPrefixLen) | |
} | |
const batchSize = 32 | |
errCh := make(chan error, concurrency) | |
for i := 0; i < concurrency; i++ { | |
go func() { | |
for { | |
keys := make([][]byte, 0, batchSize) | |
values := make([][]byte, 0, batchSize) | |
for i := 0; i < batchSize; i++ { | |
key := randKey(startKey, endKey, maxLen) | |
if *keyEndWithTimestamp { | |
replaceEndWithTs(key) | |
} | |
keys = append(keys, key) | |
value := randValue() | |
values = append(values, value) | |
} | |
err := client.BatchPutCF(keys, values, cf) | |
if err != nil { | |
errCh <- errors.Trace(err) | |
} | |
} | |
}() | |
} | |
err := <-errCh | |
if err != nil { | |
return errors.Trace(err) | |
} | |
return nil | |
} | |
func replaceEndWithTs(key []byte) { | |
if len(key) < 8 { | |
return | |
} | |
ts := time.Now().Unix() | |
binary.BigEndian.PutUint64(key[len(key)-8:], uint64(ts)) | |
} | |
func testRandKey(startKey, endKey []byte, maxLen int) { | |
for { | |
k := randKey(startKey, endKey, maxLen) | |
if bytes.Compare(k, startKey) < 0 || bytes.Compare(k, endKey) >= 0 { | |
panic(hex.EncodeToString(k)) | |
} | |
} | |
} | |
func randKey(startKey, endKey []byte, maxLen int) []byte { | |
Retry: | |
for { // Regenerate on fail | |
result := make([]byte, 0, maxLen) | |
upperUnbounded := false | |
lowerUnbounded := false | |
for i := 0; i < maxLen; i++ { | |
upperBound := 256 | |
if !upperUnbounded { | |
if i >= len(endKey) { | |
// The generated key is the same as endKey which is invalid. Regenerate it. | |
continue Retry | |
} | |
upperBound = int(endKey[i]) + 1 | |
} | |
lowerBound := 0 | |
if !lowerUnbounded { | |
if i >= len(startKey) { | |
lowerUnbounded = true | |
} else { | |
lowerBound = int(startKey[i]) | |
} | |
} | |
if lowerUnbounded { | |
if rand.Intn(257) == 0 { | |
return result | |
} | |
} | |
value := rand.Intn(upperBound - lowerBound) | |
value += lowerBound | |
if value < upperBound-1 { | |
upperUnbounded = true | |
} | |
if value > lowerBound { | |
lowerUnbounded = true | |
} | |
result = append(result, uint8(value)) | |
} | |
return result | |
} | |
} | |
func randValue() []byte { | |
result := make([]byte, 0, *maxValueLen) | |
for i := 0; i < *maxValueLen; i++ { | |
value := rand.Intn(257) | |
if value == 256 { | |
if i > 0 { | |
return result | |
} | |
value-- | |
} | |
result = append(result, uint8(value)) | |
} | |
return result | |
} | |
func checksum(client *tikv.RawKVClient, startKey, endKey []byte, cf string) error { | |
log.Infof("Start checkcum on range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) | |
scanner := newRawKVScanner(client, startKey, endKey, cf) | |
digest := crc64.New(crc64.MakeTable(crc64.ECMA)) | |
var res uint64 | |
for { | |
k, v, err := scanner.Next() | |
if err != nil { | |
return errors.Trace(err) | |
} | |
if len(k) == 0 { | |
break | |
} | |
_, _ = digest.Write(k) | |
_, _ = digest.Write(v) | |
res ^= digest.Sum64() | |
} | |
log.Infof("Checksum result: %016x", res) | |
return nil | |
} | |
func scan(client *tikv.RawKVClient, startKey, endKey []byte, cf string) error { | |
log.Infof("Start scanning data in range %v to %v", hex.EncodeToString(startKey), hex.EncodeToString(endKey)) | |
scanner := newRawKVScanner(client, startKey, endKey, cf) | |
var key []byte | |
for { | |
k, v, err := scanner.Next() | |
if err != nil { | |
return errors.Trace(err) | |
} | |
if len(k) == 0 { | |
break | |
} | |
fmt.Printf("key: %v, value: %v\n", hex.EncodeToString(k), hex.EncodeToString(v)) | |
if bytes.Compare(key, k) >= 0 { | |
log.Errorf("Scan result is not in order. Previous key: %v, Current key: %v", hex.EncodeToString(key), hex.EncodeToString(k)) | |
} | |
} | |
log.Infof("Finished Scanning.") | |
return nil | |
} | |
func diff(client *tikv.RawKVClient, diffClient *tikv.RawKVClient, startKey, endKey []byte, cf string) error { | |
panic("unimplemented") | |
} | |
const defaultScanBatchSize = 128 | |
type rawKVScanner struct { | |
client *tikv.RawKVClient | |
batchSize int | |
currentKey []byte | |
endKey []byte | |
cf string | |
bufferKeys [][]byte | |
bufferValues [][]byte | |
bufferCursor int | |
noMore bool | |
} | |
func newRawKVScanner(client *tikv.RawKVClient, startKey, endKey []byte, cf string) *rawKVScanner { | |
return &rawKVScanner{ | |
client: client, | |
batchSize: defaultScanBatchSize, | |
currentKey: startKey, | |
endKey: endKey, | |
cf: cf, | |
noMore: false, | |
} | |
} | |
func (s *rawKVScanner) Next() ([]byte, []byte, error) { | |
if s.bufferCursor >= len(s.bufferKeys) { | |
if s.noMore { | |
return nil, nil, nil | |
} | |
s.bufferCursor = 0 | |
batchSize := s.batchSize | |
var err error | |
s.bufferKeys, s.bufferValues, err = s.client.ScanCF(s.currentKey, s.endKey, batchSize, s.cf) | |
if err != nil { | |
return nil, nil, errors.Trace(err) | |
} | |
if len(s.bufferKeys) < batchSize { | |
s.noMore = true | |
} | |
if len(s.bufferKeys) == 0 { | |
return nil, nil, nil | |
} | |
s.currentKey = append(s.bufferKeys[len(s.bufferKeys)-1], 0) | |
} | |
key := s.bufferKeys[s.bufferCursor] | |
value := s.bufferValues[s.bufferCursor] | |
s.bufferCursor++ | |
return key, value, nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment