Skip to content

Instantly share code, notes, and snippets.

@connorholt
Forked from mjpitz/memberlist-serf-raft.go
Created January 18, 2022 15:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save connorholt/c3718ff09866049d8777503b4e012743 to your computer and use it in GitHub Desktop.
Save connorholt/c3718ff09866049d8777503b4e012743 to your computer and use it in GitHub Desktop.
Basic idea on how to use memberlist, serf, and raft all together. Last example I found had a few bugs to work out.
package main
import (
"fmt"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
"github.com/hashicorp/serf/serf"
"io"
"log"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
)
const (
raftState = "raft/"
snapshotsRetained = 2
raftLogCacheSize = 512
)
func obtainStores(dataDir string, logOutput io.Writer) (raft.LogStore, raft.StableStore, raft.SnapshotStore, error) {
path := filepath.Join(dataDir, raftState)
if err := os.MkdirAll(path, 0755); err != nil {
return nil, nil, nil, err
}
stable, err := raftboltdb.NewBoltStore(filepath.Join(path, "raft.db"))
if err != nil {
return nil, nil, nil, err
}
logs, err := raft.NewLogCache(raftLogCacheSize, stable)
if err != nil {
return nil, nil, nil, err
}
snaps, err := raft.NewFileSnapshotStore(path, snapshotsRetained, logOutput)
if err != nil {
return nil, nil, nil, err
}
return logs, stable, snaps, nil
}
func mainE() error {
// assumed to run on the same port, just different hosts
peers := []string{
"localhost",
}
bindAddress := "localhost"
serfPort := 7946
raftPort := 1235
dataDir := "data"
logOutput := os.Stdout
serfCh := make(chan serf.Event, 16)
defer close(serfCh)
memberlistConfig := memberlist.DefaultLANConfig()
memberlistConfig.BindPort = serfPort
memberlistConfig.LogOutput = logOutput
serfConfig := serf.DefaultConfig()
serfConfig.EventCh = serfCh
serfConfig.MemberlistConfig = memberlistConfig
serfConfig.LogOutput = logOutput
s, err := serf.Create(serfConfig)
if err != nil {
return err
}
serfPeers := make([]string, len(peers))
for i, peer := range peers {
serfPeers[i] = fmt.Sprintf("%s:%d", peer, serfPort)
}
if len(serfPeers) > 0 {
_, err := s.Join(serfPeers, false)
if err != nil {
return err
}
}
raftAddress := fmt.Sprintf("%s:%d", bindAddress, raftPort)
transport, err := raft.NewTCPTransport(raftAddress, nil, 3, 10*time.Second, logOutput)
if err != nil {
return err
}
logs, stable, snaps, err := obtainStores(dataDir, logOutput)
if err != nil {
return err
}
if closer, ok := stable.(io.Closer); ok {
defer func() {
err := closer.Close()
if err != nil {
log.Fatal(err)
}
}()
}
hasState, err := raft.HasExistingState(logs, stable, snaps)
if err != nil {
return err
}
raftConfig := raft.DefaultConfig()
raftConfig.LocalID = raft.ServerID(raftAddress)
if !hasState {
raftPeers := make([]raft.Server, len(peers))
for i, peer := range peers {
p := fmt.Sprintf("%s:%d", peer, raftPort)
raftPeers[i] = raft.Server{
Suffrage: raft.Voter,
ID: raft.ServerID(p),
Address: raft.ServerAddress(p),
}
}
bootstrapConfig := raft.Configuration{
Servers: raftPeers,
}
err := raft.BootstrapCluster(raftConfig, logs, stable, snaps, transport, bootstrapConfig)
if err != nil {
return err
}
}
r, err := raft.NewRaft(raftConfig, &fsm{}, logs, stable, snaps, transport)
if err != nil {
return err
}
defer func(){
future := r.Shutdown()
if err := future.Error(); err != nil {
log.Fatal(err)
}
}()
done := false
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
ticker := time.NewTicker(3 * time.Second)
for !done {
select {
case <-ch:
done = true
case <-ticker.C:
future := r.VerifyLeader()
if err := future.Error(); err != nil {
fmt.Println("node is a follower")
} else {
fmt.Println("node is a leader")
}
case ev := <-serfCh:
leader := r.VerifyLeader()
if memberEvent, ok := ev.(serf.MemberEvent); ok {
for _, member := range memberEvent.Members {
eventType := memberEvent.EventType()
changedPeer := fmt.Sprintf("%s:%d", member.Addr.String(), raftPort)
if leader.Error() != nil {
continue
}
if eventType == serf.EventMemberJoin {
future := r.AddVoter(raft.ServerID(changedPeer), raft.ServerAddress(changedPeer), 0, 0)
if err := future.Error(); err != nil {
// log
}
} else if eventType == serf.EventMemberLeave || eventType == serf.EventMemberFailed {
future := r.RemoveServer(raft.ServerID(changedPeer), 0, 0)
if err := future.Error(); err != nil {
// log
}
}
}
}
}
}
fmt.Println("shutting down")
return nil
}
func main() {
if err := mainE(); err != nil {
panic(err)
}
}
type fsm struct {}
func (*fsm) Apply(l *raft.Log) interface{} {
return nil
}
func (*fsm) Snapshot() (raft.FSMSnapshot, error) {
return nil, nil
}
func (*fsm) Restore(read io.ReadCloser) error {
return nil
}
var _ raft.FSM = &fsm{}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment