Skip to content

Instantly share code, notes, and snippets.

@siddontang
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save siddontang/3be3d8555b79ce51d742 to your computer and use it in GitHub Desktop.
Save siddontang/3be3d8555b79ce51d742 to your computer and use it in GitHub Desktop.
raft example
package main
import (
"flag"
"fmt"
"github.com/hashicorp/raft"
"github.com/ugorji/go/codec"
"io"
"net"
"net/http"
"os"
"sync"
)
var httpBasePort int = 8000
var raftBasePort int = 7000
var n = flag.Int("n", 0, "")
type MockFSM struct {
sync.Mutex
logs [][]byte
}
type MockSnapshot struct {
logs [][]byte
maxIndex int
}
func (m *MockFSM) Apply(log *raft.Log) interface{} {
m.Lock()
defer m.Unlock()
m.logs = append(m.logs, log.Data)
return len(m.logs)
}
func (m *MockFSM) Snapshot() (raft.FSMSnapshot, error) {
m.Lock()
defer m.Unlock()
return &MockSnapshot{m.logs, len(m.logs)}, nil
}
func (m *MockFSM) Restore(inp io.ReadCloser) error {
m.Lock()
defer m.Unlock()
defer inp.Close()
hd := codec.MsgpackHandle{}
dec := codec.NewDecoder(inp, &hd)
m.logs = nil
return dec.Decode(&m.logs)
}
func (m *MockSnapshot) Persist(sink raft.SnapshotSink) error {
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(sink, &hd)
if err := enc.Encode(m.logs[:m.maxIndex]); err != nil {
sink.Cancel()
return err
}
sink.Close()
return nil
}
func (m *MockSnapshot) Release() {
}
type App struct {
r *raft.Raft
ps raft.PeerStore
fsm *MockFSM
}
func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/add":
app.addPeer(w, r)
case "/remove":
app.removePeer(w, r)
case "/leader":
app.getLeader(w, r)
case "/peers":
app.getPeers(w, r)
case "/set":
app.setData(w, r)
case "/get":
app.getData(w, r)
default:
w.WriteHeader(http.StatusNotFound)
}
}
var OK = []byte("OK\n")
func (app *App) addPeer(w http.ResponseWriter, r *http.Request) {
addr := r.FormValue("addr")
p, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
w.Write([]byte(err.Error()))
return
}
f := app.r.AddPeer(p)
if err := f.Error(); err != nil {
w.Write([]byte(err.Error()))
return
} else {
w.Write(OK)
}
}
func (app *App) removePeer(w http.ResponseWriter, r *http.Request) {
addr := r.FormValue("addr")
p, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
w.Write([]byte(err.Error()))
return
}
f := app.r.RemovePeer(p)
if err := f.Error(); err != nil {
w.Write([]byte(err.Error()))
return
} else {
w.Write(OK)
}
}
func (app *App) getLeader(w http.ResponseWriter, r *http.Request) {
addr := app.r.Leader()
if addr == nil {
w.Write([]byte("no leader\n"))
return
}
w.Write([]byte(addr.String() + "\n"))
}
func (app *App) getPeers(w http.ResponseWriter, r *http.Request) {
addrs, err := app.ps.Peers()
if err != nil {
w.Write([]byte(err.Error()))
return
}
s := ""
for _, v := range addrs {
s += v.String()
s += "\n"
}
w.Write([]byte(s))
}
func (app *App) setData(w http.ResponseWriter, r *http.Request) {
if app.r.State() != raft.Leader {
w.Write([]byte(fmt.Sprintf("not leader %v", app.r.Leader())))
return
}
f := app.r.Apply([]byte(r.URL.Query().Encode()), 0)
println("raw queue:", r.URL.Query().Encode())
if err := f.Error(); err != nil {
w.Write([]byte(err.Error()))
return
} else {
w.Write([]byte(fmt.Sprintf("Apply Response: %v\n", f.Response())))
}
}
func (app *App) getData(w http.ResponseWriter, r *http.Request) {
if len(app.fsm.logs) == 0 {
w.Write([]byte("empty data\n"))
return
} else {
w.Write(app.fsm.logs[len(app.fsm.logs)-1])
return
}
}
func main() {
flag.Parse()
app := new(App)
s := raft.NewInmemStore()
addr := fmt.Sprintf("127.0.0.1:%d", (raftBasePort + *n))
println(addr)
trans, err := raft.NewTCPTransport(addr, nil, 1, 0, nil)
if err != nil {
println(err.Error())
return
}
dir := fmt.Sprintf("./dir%d", *n)
os.MkdirAll(dir, os.ModePerm)
var f *raft.FileSnapshotStore
f, err = raft.NewFileSnapshotStore(dir, 2, nil)
if err != nil {
println(err.Error())
return
}
c := raft.DefaultConfig()
c.EnableSingleNode = true
app.ps = &raft.StaticPeers{}
app.fsm = &MockFSM{}
app.r, err = raft.NewRaft(c, app.fsm, s, s, f,
app.ps, trans)
if err != nil {
println(err.Error())
return
}
addr = fmt.Sprintf("127.0.0.1:%d", (httpBasePort + *n))
println(addr)
http.ListenAndServe(addr, app)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment