Created
July 22, 2015 17:15
-
-
Save superfell/1c8f81df0ccd955ed3fb to your computer and use it in GitHub Desktop.
raft election
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io" | |
"net" | |
"os" | |
"path/filepath" | |
"strconv" | |
"time" | |
"github.com/hashicorp/raft" | |
"github.com/hashicorp/raft-boltdb" | |
) | |
// Basic app that starts up a fixed 3 node raft cluster, with a no-op FSM | |
// start 3 instances with a different param, e.g. | |
// ./raft-election 1 | |
// ./raft-election 2 | |
// ./raft-election 3 | |
// | |
func main() { | |
fmt.Printf("Starting raft cluster\n") | |
if len(os.Args) < 2 || (os.Args[1] != "1" && os.Args[1] != "2" && os.Args[1] != "3") { | |
fmt.Printf("Usage ./raft-election 1|2|3\n") | |
os.Exit(-1) | |
} | |
execdir, err := filepath.Abs(filepath.Dir(os.Args[0])) | |
if err != nil { | |
fmt.Printf("Unable to resolve exec dir, %v : %v\n", os.Args[0], err) | |
os.Exit(-1) | |
} | |
datadir := execdir + "/data/" + os.Args[1] | |
hasExistingData := true | |
if _, err := os.Stat(datadir); os.IsNotExist(err) { | |
fmt.Printf("creating data dir %v\n", datadir) | |
err = os.MkdirAll(datadir, 0744) | |
if err != nil { | |
fmt.Printf("Unable to create datadir %v : %v", datadir, err) | |
os.Exit(-1) | |
} | |
hasExistingData = false | |
} else { | |
fmt.Printf("using existing data dir %v\n", datadir) | |
} | |
store, err := raftboltdb.NewBoltStore(datadir + "/store.bolt") | |
if err != nil { | |
fmt.Printf("Unable to initialize log %v\n", err) | |
os.Exit(-1) | |
} | |
ss, err := raft.NewFileSnapshotStore(datadir, 5, os.Stdout) | |
if err != nil { | |
fmt.Printf("Unable to initialize snapshots %v\n", err) | |
os.Exit(-1) | |
} | |
listens := []string{"127.0.0.1:5001", "127.0.0.1:5002", "127.0.0.1:5003"} | |
idx, _ := strconv.ParseInt(os.Args[1], 0, 16) | |
listen := listens[idx-1] | |
advertise, err := net.ResolveTCPAddr("tcp", listen) | |
if err != nil { | |
fmt.Printf("Unable to resovle listen address %v: %v", listen, err) | |
os.Exit(-1) | |
} | |
transport, err := raft.NewTCPTransport(listen, advertise, 5, 10*time.Second, os.Stdout) | |
if err != nil { | |
fmt.Printf("Unable to start TCP Transport: %v\n", err) | |
os.Exit(-1) | |
} | |
fmt.Printf("Started TCP Transport on %v\n", advertise) | |
ps := raft.NewJSONPeers(datadir, transport) | |
if !hasExistingData { | |
ps.SetPeers(listens) | |
} | |
raft, err := raft.NewRaft(raft.DefaultConfig(), &fsm{}, store, store, ss, ps, transport) | |
if err != nil { | |
fmt.Printf("Unable to start raft %v\n", err) | |
os.Exit(-1) | |
} | |
for { | |
isLeader := <-raft.LeaderCh() | |
fmt.Printf("node %v leader change now %v\n", raft, isLeader) | |
if isLeader { | |
payload := make([]byte, 5) | |
f := raft.Apply(payload, time.Second*10) | |
fmt.Printf("raft.Apply returned %v:%v:%v\n\n", f.Error(), f.Index(), f.Response()) | |
} | |
} | |
} | |
type fsm struct { | |
} | |
type snapshot struct { | |
} | |
func (f *fsm) Apply(log *raft.Log) interface{} { | |
fmt.Printf("fsm.Apply %v:%v\n", log.Term, log.Index) | |
return nil | |
} | |
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) { | |
return &snapshot{}, nil | |
} | |
func (f *fsm) Restore(io.ReadCloser) error { | |
return nil | |
} | |
func (s *snapshot) Persist(sink raft.SnapshotSink) error { | |
return nil | |
} | |
func (s *snapshot) Release() { | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment