Skip to content

Instantly share code, notes, and snippets.

@yusufsyaifudin
Created May 29, 2020 07:51
Show Gist options
  • Save yusufsyaifudin/f404e784daa415b91ec7fb2fd7c36b68 to your computer and use it in GitHub Desktop.
Save yusufsyaifudin/f404e784daa415b91ec7fb2fd7c36b68 to your computer and use it in GitHub Desktop.
package fsm
import (
"encoding/json"
"fmt"
"github.com/dgraph-io/badger/v2"
"github.com/hashicorp/raft"
"io"
"os"
"strings"
)
// badgerFSM raft.FSM implementation using badgerDB
type badgerFSM struct {
db *badger.DB
}
// Apply log is invoked once a log entry is committed.
// It returns a value which will be made available in the
// ApplyFuture returned by Raft.Apply method if that
// method was called on the same Raft node as the FSM.
func (b badgerFSM) Apply(log *raft.Log) interface{} {
switch log.Type {
case raft.LogCommand:
var payload = CommandPayload{}
if err := json.Unmarshal(log.Data, &payload); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error marshalling store payload %s\n", err.Error())
return nil
}
op := strings.ToUpper(strings.TrimSpace(payload.Operation))
switch op {
case "SET":
return &ApplyResponse{
Error: b.set(payload.Key, payload.Value),
Data: payload.Value,
}
case "GET":
data, err := b.get(payload.Key)
return &ApplyResponse{
Error: err,
Data: data,
}
case "DELETE":
return &ApplyResponse{
Error: b.delete(payload.Key),
Data: nil,
}
}
}
_, _ = fmt.Fprintf(os.Stderr, "not raft log command type\n")
return nil
}
// Snapshot will be called during make snapshot.
// Snapshot is used to support log compaction.
// No need to call snapshot since it already persisted in disk (using BadgerDB) when raft calling Apply function.
func (b badgerFSM) Snapshot() (raft.FSMSnapshot, error) {
return newSnapshotNoop()
}
// Restore is used to restore an FSM from a Snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous
// state.
// Restore will update all data in BadgerDB
func (b badgerFSM) Restore(rClose io.ReadCloser) error {
defer func() {
if err := rClose.Close(); err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[FINALLY RESTORE] close error %s\n", err.Error())
}
}()
_, _ = fmt.Fprintf(os.Stdout, "[START RESTORE] read all message from snapshot\n")
var totalRestored int
decoder := json.NewDecoder(rClose)
for decoder.More() {
var data = &CommandPayload{}
err := decoder.Decode(data)
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error decode data %s\n", err.Error())
return err
}
if err := b.set(data.Key, data.Value); err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error persist data %s\n", err.Error())
return err
}
totalRestored++
}
// read closing bracket
_, err := decoder.Token()
if err != nil {
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] error %s\n", err.Error())
return err
}
_, _ = fmt.Fprintf(os.Stdout, "[END RESTORE] success restore %d messages in snapshot\n", totalRestored)
return nil
}
// NewBadger raft.FSM implementation using badgerDB
func NewBadger(badgerDB *badger.DB) raft.FSM {
return &badgerFSM{
db: badgerDB,
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment