Skip to content

Instantly share code, notes, and snippets.

@siddontang
Created March 24, 2016 11:08
Show Gist options
  • Save siddontang/f993b353de45610bbca4 to your computer and use it in GitHub Desktop.
Save siddontang/f993b353de45610bbca4 to your computer and use it in GitHub Desktop.
A simple raft command line tool
package main
import (
"fmt"
"net"
"os"
"path"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/golang/protobuf/proto"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/raft_cmdpb"
"github.com/pingcap/kvproto/pkg/raft_serverpb"
"github.com/pingcap/pd/server"
"github.com/pingcap/pd/util"
"github.com/spf13/cobra"
"github.com/twinj/uuid"
)
const (
cliName = "raftctl"
cliDescription = "A simple command line client for raft server"
)
var (
rootCmd = &cobra.Command{
Use: cliName,
Short: cliDescription,
}
)
var (
raftAddr string
etcdEndpoints []string
pdRootPath string
clusterID uint64
client *clientv3.Client
)
func init() {
rootCmd.PersistentFlags().StringVar(&raftAddr, "raft", "127.0.0.1:20160", "set the raft server address")
rootCmd.PersistentFlags().StringSliceVar(&etcdEndpoints, "etcd", []string{"127.0.0.1:2379"}, "Etcd endpoints")
rootCmd.PersistentFlags().StringVar(&pdRootPath, "pd", "/pd", "pd root path in etcd")
rootCmd.PersistentFlags().Uint64Var(&clusterID, "cluster-id", 1, "default cluster id")
initKVCommand()
cobra.EnablePrefixMatching = true
}
func perror(err error) {
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
}
func main() {
var err error
client, err = clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
DialTimeout: 3 * time.Second,
})
perror(err)
err = rootCmd.Execute()
perror(err)
}
func initKVCommand() {
getCmd := &cobra.Command{
Use: "get <key>",
Short: "get the value with given key",
Run: getCommandFunc,
}
rootCmd.AddCommand(getCmd)
putCmd := &cobra.Command{
Use: "put <key> <value>",
Short: "set key with value",
Run: putCommandFunc,
}
rootCmd.AddCommand(putCmd)
seekCmd := &cobra.Command{
Use: "seek <key>",
Short: "seek the key >= given key",
Run: seekCommandFunc,
}
rootCmd.AddCommand(seekCmd)
deleteCmd := &cobra.Command{
Use: "delete <key>",
Short: "delete the key",
Run: deleteCommandFunc,
}
rootCmd.AddCommand(deleteCmd)
}
func getCommandFunc(cmd *cobra.Command, args []string) {
key := args[0]
region := getRegion(key)
leader := getRegionLeader(region)
req := &raft_cmdpb.RaftCmdRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: region.RegionId,
Peer: leader,
},
Requests: []*raft_cmdpb.Request{
&raft_cmdpb.Request{
CmdType: raft_cmdpb.CmdType_Get.Enum(),
Get: &raft_cmdpb.GetRequest{
Key: []byte(key),
},
},
},
}
resp := sendRaftCommnd(req)
fmt.Printf("%s\n", resp.Responses[0].Get)
}
func putCommandFunc(cmd *cobra.Command, args []string) {
key := args[0]
value := args[1]
region := getRegion(key)
leader := getRegionLeader(region)
req := &raft_cmdpb.RaftCmdRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: region.RegionId,
Peer: leader,
},
Requests: []*raft_cmdpb.Request{
&raft_cmdpb.Request{
CmdType: raft_cmdpb.CmdType_Put.Enum(),
Put: &raft_cmdpb.PutRequest{
Key: []byte(key),
Value: []byte(value),
},
},
},
}
sendRaftCommnd(req)
}
func seekCommandFunc(cmd *cobra.Command, args []string) {
key := args[0]
region := getRegion(key)
leader := getRegionLeader(region)
req := &raft_cmdpb.RaftCmdRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: region.RegionId,
Peer: leader,
},
Requests: []*raft_cmdpb.Request{
&raft_cmdpb.Request{
CmdType: raft_cmdpb.CmdType_Seek.Enum(),
Seek: &raft_cmdpb.SeekRequest{
Key: []byte(key),
},
},
},
}
resp := sendRaftCommnd(req)
fmt.Printf("%s\n", resp.Responses[0].Seek)
}
func deleteCommandFunc(cmd *cobra.Command, args []string) {
key := args[0]
region := getRegion(key)
leader := getRegionLeader(region)
req := &raft_cmdpb.RaftCmdRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: region.RegionId,
Peer: leader,
},
Requests: []*raft_cmdpb.Request{
&raft_cmdpb.Request{
CmdType: raft_cmdpb.CmdType_Delete.Enum(),
Delete: &raft_cmdpb.DeleteRequest{
Key: []byte(key),
},
},
},
}
sendRaftCommnd(req)
}
func getPDLeader() string {
leader, err := server.GetLeader(client, path.Join(pdRootPath, "leader"))
perror(err)
return *leader.Addr
}
func getRegion(key string) *metapb.Region {
addr := getPDLeader()
conn, err := net.Dial("tcp", addr)
perror(err)
defer conn.Close()
req := &pdpb.Request{
Header: &pdpb.RequestHeader{
ClusterId: proto.Uint64(clusterID),
},
CmdType: pdpb.CommandType_GetMeta.Enum(),
GetMeta: &pdpb.GetMetaRequest{
MetaType: pdpb.MetaType_RegionType.Enum(),
RegionKey: []byte(key),
},
}
err = util.WriteMessage(conn, 0, req)
perror(err)
var resp pdpb.Response
_, err = util.ReadMessage(conn, &resp)
perror(err)
return resp.GetMeta.Region
}
func getNodeAddr(nodeID uint64) string {
addr := getPDLeader()
conn, err := net.Dial("tcp", addr)
perror(err)
defer conn.Close()
req := &pdpb.Request{
Header: &pdpb.RequestHeader{
ClusterId: proto.Uint64(clusterID),
},
CmdType: pdpb.CommandType_GetMeta.Enum(),
GetMeta: &pdpb.GetMetaRequest{
MetaType: pdpb.MetaType_NodeType.Enum(),
NodeId: proto.Uint64(nodeID),
},
}
err = util.WriteMessage(conn, 0, req)
perror(err)
var resp pdpb.Response
_, err = util.ReadMessage(conn, &resp)
perror(err)
return resp.GetMeta.Node.GetAddress()
}
func getRegionLeader(region *metapb.Region) *metapb.Peer {
regionID := region.GetRegionId()
peers := region.GetPeers()
req := &raft_cmdpb.RaftCmdRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: proto.Uint64(regionID),
},
StatusRequest: &raft_cmdpb.StatusRequest{
CmdType: raft_cmdpb.StatusCmdType_RegionLeader.Enum(),
RegionLeader: &raft_cmdpb.RegionLeaderRequest{},
},
}
for _, peer := range peers {
req.Header.Peer = peer
resp := sendRaftCommnd(req)
leader := resp.StatusResponse.RegionLeader.Leader
if leader != nil {
return leader
}
}
return nil
}
func sendRaftCommnd(req *raft_cmdpb.RaftCmdRequest) *raft_cmdpb.RaftCmdResponse {
req.Header.Uuid = uuid.NewV4().Bytes()
addr := getNodeAddr(req.Header.Peer.GetNodeId())
conn, err := net.Dial("tcp", addr)
perror(err)
defer conn.Close()
err = util.WriteMessage(conn, 0, &raft_serverpb.Message{
MsgType: raft_serverpb.MessageType_Cmd.Enum(),
CmdReq: req,
})
perror(err)
var resp raft_serverpb.Message
_, err = util.ReadMessage(conn, &resp)
perror(err)
return resp.CmdResp
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment