Skip to content

Instantly share code, notes, and snippets.

@superfell
Created July 22, 2015 17:15
Show Gist options
  • Save superfell/1c8f81df0ccd955ed3fb to your computer and use it in GitHub Desktop.
Save superfell/1c8f81df0ccd955ed3fb to your computer and use it in GitHub Desktop.
raft election
package main
import (
"fmt"
"io"
"net"
"os"
"path/filepath"
"strconv"
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
)
// Basic app that starts up a fixed 3 node raft cluster, with a no-op FSM
// start 3 instances with a different param, e.g.
// ./raft-election 1
// ./raft-election 2
// ./raft-election 3
//
func main() {
fmt.Printf("Starting raft cluster\n")
if len(os.Args) < 2 || (os.Args[1] != "1" && os.Args[1] != "2" && os.Args[1] != "3") {
fmt.Printf("Usage ./raft-election 1|2|3\n")
os.Exit(-1)
}
execdir, err := filepath.Abs(filepath.Dir(os.Args[0]))
if err != nil {
fmt.Printf("Unable to resolve exec dir, %v : %v\n", os.Args[0], err)
os.Exit(-1)
}
datadir := execdir + "/data/" + os.Args[1]
hasExistingData := true
if _, err := os.Stat(datadir); os.IsNotExist(err) {
fmt.Printf("creating data dir %v\n", datadir)
err = os.MkdirAll(datadir, 0744)
if err != nil {
fmt.Printf("Unable to create datadir %v : %v", datadir, err)
os.Exit(-1)
}
hasExistingData = false
} else {
fmt.Printf("using existing data dir %v\n", datadir)
}
store, err := raftboltdb.NewBoltStore(datadir + "/store.bolt")
if err != nil {
fmt.Printf("Unable to initialize log %v\n", err)
os.Exit(-1)
}
ss, err := raft.NewFileSnapshotStore(datadir, 5, os.Stdout)
if err != nil {
fmt.Printf("Unable to initialize snapshots %v\n", err)
os.Exit(-1)
}
listens := []string{"127.0.0.1:5001", "127.0.0.1:5002", "127.0.0.1:5003"}
idx, _ := strconv.ParseInt(os.Args[1], 0, 16)
listen := listens[idx-1]
advertise, err := net.ResolveTCPAddr("tcp", listen)
if err != nil {
fmt.Printf("Unable to resovle listen address %v: %v", listen, err)
os.Exit(-1)
}
transport, err := raft.NewTCPTransport(listen, advertise, 5, 10*time.Second, os.Stdout)
if err != nil {
fmt.Printf("Unable to start TCP Transport: %v\n", err)
os.Exit(-1)
}
fmt.Printf("Started TCP Transport on %v\n", advertise)
ps := raft.NewJSONPeers(datadir, transport)
if !hasExistingData {
ps.SetPeers(listens)
}
raft, err := raft.NewRaft(raft.DefaultConfig(), &fsm{}, store, store, ss, ps, transport)
if err != nil {
fmt.Printf("Unable to start raft %v\n", err)
os.Exit(-1)
}
for {
isLeader := <-raft.LeaderCh()
fmt.Printf("node %v leader change now %v\n", raft, isLeader)
if isLeader {
payload := make([]byte, 5)
f := raft.Apply(payload, time.Second*10)
fmt.Printf("raft.Apply returned %v:%v:%v\n\n", f.Error(), f.Index(), f.Response())
}
}
}
type fsm struct {
}
type snapshot struct {
}
func (f *fsm) Apply(log *raft.Log) interface{} {
fmt.Printf("fsm.Apply %v:%v\n", log.Term, log.Index)
return nil
}
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
return &snapshot{}, nil
}
func (f *fsm) Restore(io.ReadCloser) error {
return nil
}
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
return nil
}
func (s *snapshot) Release() {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment