Skip to content

Instantly share code, notes, and snippets.

@disksing
Created July 13, 2018 13:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save disksing/502e303a296db9af8ae6325293e59528 to your computer and use it in GitHub Desktop.
Save disksing/502e303a296db9af8ae6325293e59528 to your computer and use it in GitHub Desktop.
PD heartbeat bench tool
package main
import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"time"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"google.golang.org/grpc"
)
var (
pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address")
storeCount = flag.Int("store", 200, "store count")
regionCount = flag.Uint64("region", 10000, "region count")
keyLen = flag.Int("keylen", 256, "key length")
replica = flag.Int("replica", 3, "replica count")
)
var clusterID uint64
func newClient() pdpb.PDClient {
cc, err := grpc.Dial(*pdAddr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
}
return pdpb.NewPDClient(cc)
}
func initClusterID(cli pdpb.PDClient) {
res, err := cli.GetMembers(context.TODO(), &pdpb.GetMembersRequest{})
if err != nil {
log.Fatal(err)
}
clusterID = res.GetHeader().GetClusterId()
log.Println("ClusterID:", clusterID)
}
func header() *pdpb.RequestHeader {
return &pdpb.RequestHeader{
ClusterId: clusterID,
}
}
func bootstrap(cli pdpb.PDClient) {
isBootstrapped, err := cli.IsBootstrapped(context.TODO(), &pdpb.IsBootstrappedRequest{Header: header()})
if err != nil {
log.Fatal(err)
}
if isBootstrapped.GetBootstrapped() {
log.Println("already bootstrapped")
return
}
store := &metapb.Store{
Id: 1,
Address: fmt.Sprintf("localhost:%d", 1),
}
region := &metapb.Region{
Id: 1,
Peers: []*metapb.Peer{&metapb.Peer{StoreId: 1, Id: 1}},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}
req := &pdpb.BootstrapRequest{
Header: header(),
Store: store,
Region: region,
}
_, err = cli.Bootstrap(context.TODO(), req)
if err != nil {
log.Fatal(err)
}
log.Println("bootstrapped")
}
func putStores(cli pdpb.PDClient) {
for i := uint64(1); i <= uint64(*storeCount); i++ {
store := &metapb.Store{
Id: i,
Address: fmt.Sprintf("localhost:%d", i),
}
_, err := cli.PutStore(context.TODO(), &pdpb.PutStoreRequest{Header: header(), Store: store})
if err != nil {
log.Fatal(err)
}
}
}
func newStartKey(id uint64) []byte {
k := make([]byte, *keyLen)
copy(k, []byte(fmt.Sprintf("%010d", id)))
return k
}
func newEndKey(id uint64) []byte {
k := newStartKey(id)
k[len(k)-1]++
return k
}
type Store struct {
id uint64
}
func (s *Store) Run() {
cli := newClient()
stream, err := cli.RegionHeartbeat(context.TODO())
if err != nil {
log.Fatal(err)
}
var peers []*metapb.Peer
for i := 0; i < *replica; i++ {
storeID := s.id + uint64(i)
if storeID > uint64(*storeCount) {
storeID -= uint64(*storeCount)
}
peers = append(peers, &metapb.Peer{Id: uint64(i + 1), StoreId: storeID})
}
for {
log.Printf("store %v start heartbeat", s.id)
startTime := time.Now()
for regionID := s.id; regionID <= *regionCount; regionID += uint64(*storeCount) {
meta := &metapb.Region{
Id: regionID,
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 2, Version: 2},
StartKey: newStartKey(regionID),
EndKey: newEndKey(regionID),
}
err = stream.Send(&pdpb.RegionHeartbeatRequest{
Header: header(),
Region: meta,
Leader: peers[0],
})
if err != nil {
log.Fatal(err)
}
}
log.Printf("store %v finish heartbeat, cost time: %v", s.id, time.Since(startTime))
time.Sleep(time.Duration(rand.Intn(30)+45) * time.Second)
}
}
func main() {
flag.Parse()
cli := newClient()
initClusterID(cli)
bootstrap(cli)
putStores(cli)
log.Println("finish put stores")
for i := 1; i <= *storeCount; i++ {
s := Store{id: uint64(i)}
go s.Run()
}
select {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment