Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Raft pseudocode
// This file gives pseudocode for the complete operation of a Raft peer,
// including the fast backtracking optimization. The implementation here is
// currently 0-indexed, as this simplifies the implementation in many cases.
// This implementation also does not discuss locks at all, which will be vital
// in any real implementation.
//
// ============================================================================
// The following data needs to be persisted
// ============================================================================
//
// This is the term this Raft server is currently in
currentTerm = 0
// This is the Raft peer that this server has voted for in *this* term (if any)
votedFor = None
// The log is a list of {term, command} tuples, where the command is an opaque
// value which only holds meaning to the replicated state machine running on
// top of Raft.
log = []
//
// ============================================================================
// The following data is ephemeral
// ============================================================================
//
// The state this server is currently in, can be FOLLOWER, CANDIDATE, or LEADER
state = FOLLOWER
// The Raft entries up to and including this index are considered committed by
// Raft, meaning they will not change, and can safely be applied to the state
// machine.
commitIndex = -1
// The last command in the log to be applied to the state machine.
lastApplied = -1
// nextIndex is a guess as to how much of our log (as leader) matches that of
// each other peer. This is used to determine what entries to send to each peer
// next.
nextIndex = map [..] -> 0
// matchIndex is a measurement of how much of our log (as leader) we know to be
// replicated at each other server. This is used to determine when some prefix
// of entries in our log from the current term has been replicated to a
// majority of servers, and is thus safe to apply.
matchIndex = map [..] -> -1
// This function updates the state machine as a result of the command we pass
// it. In order to build a replicated state machine, we need to call
// stateMachine with the same commands, in the same order, on all servers.
stateMachine func(command)
//
// ============================================================================
// Raft RPC handlers
// ============================================================================
//
RequestVote(term, candidateID, lastLogIndex, lastLogTerm)
-> (term, voteGranted)
{
// step down before handling RPC if need be
if term > currentTerm {
currentTerm = term
state = FOLLOWER
votedFor = -1
nextIndex = map [..] -> length(log)
matchIndex = map [..] -> -1
}
// don't vote for out-of-date candidates
if term < currentTerm {
return (currentTerm, false)
}
// don't double vote
if votedFor != None and votedFor != candidateID {
return (currentTerm, false)
}
// check how up-to-date our log is
ourLastLogIndex = length(log) - 1
ourLastLogTerm = -1
if length(log) != 0 {
ourLastLogTerm = log[ourLastLogIndex].term
}
// reject leaders with old logs
if lastLogTerm < ourLastLogTerm {
return (currentTerm, false)
}
// reject leaders with short logs
if lastLogTerm == ourLastLogTerm and lastLogIndex < ourLastLogIndex {
return (currentTerm, false)
votedFor = candidateID
// TODO: reset election timer
// TODO: persist Raft state
return (currentTerm, true)
}
AppendEntries(term, leaderID, prevLogIndex, prevLogTerm, entries[], leaderCommit)
-> (term, conflictIndex, conflictTerm, success)
{
// step down before handling RPC if need be
if term > currentTerm {
currentTerm = term
state = FOLLOWER
votedFor = -1
nextIndex = map [..] -> length(log)
matchIndex = map [..] -> -1
}
if term < currentTerm {
return (currentTerm, -1, -1, false)
}
// TODO: reset election timer
if prevLogIndex >= length(log) {
return (currentTerm, length(log), -1, false)
}
ourPrevLogTerm = log[prevLogIndex].term
if prevLogIndex >= 0 and ourPrevLogTerm != prevLogTerm {
firstOfTerm = prevLogIndex
for i from prevLogIndex to 0 (inclusive) {
if log[i].term != ourPrevLogTerm {
break
}
firstOfTerm = i
}
return (currentTerm, firstOfTerm, ourPrevLogTerm, false)
}
for i from 0 to length(entries) {
index = prevLogIndex + i + 1
if index >= length(log) or log[index].term != entries[i].term {
log = log[:index] ++ entries[i:]
break
}
}
// TODO: persist Raft state
if leaderCommit > commitIndex {
commitIndex = length(log) - 1
if commitIndex > leaderCommit {
commitIndex = leaderCommit
}
}
if commitIndex > lastApplied {
for i from lastApplied+1 to commitIndex (inclusive) {
stateMachine(log[i].command)
lastApplied = i
}
}
return (currentTerm, -1, -1, true)
}
//
// ============================================================================
// Raft event handlers
// ============================================================================
//
OnElectionTimer() {
if state == LEADER {
return
}
currentTerm += 1
electionTerm = currentTerm
votedFor = -1
state = CANDIDATE
votes = 0
for each peer p {
// NOTE: also request a vote from ourself
// NOTE: me here is this server's identifier
// NOTE: if the RPC fails, it counts as granted = false
// NOTE: these RPCs should be made in parallel
term, granted = p.RequestVote(currentTerm, me, length(log)-1, log[-1].term)
if term > currentTerm {
currentTerm = term
state = FOLLOWER
votedFor = -1
nextIndex = map [..] -> length(log)
matchIndex = map [..] -> -1
}
if granted {
// TODO: reset election timer
votes += 1
}
}
if currentTerm != electionTerm {
return
}
if votes <= #peers/2 {
state = FOLLOWER
return
}
state = LEADER
nextIndex = map [..] -> length(log)
matchIndex = map [..] -> -1
// TODO: reset election timer
// TODO: trigger sending of AppendEntries
}
OnHeartbeatTimerOrSendTrigger() {
// NOTE: it may be useful to have separate timers for each peer, so
// that you can retry AppendEntries to one peer without sending to all
// peers.
if state != LEADER {
return
}
for each peer (including self) {
// NOTE: do this in parallel for each peer
if nextIndex[peer] > length(log) {
rf.nextIndex[peer] = length(log)
}
entries = log[nextIndex[peer]:]
prevLogIndex = rf.nextIndex[peer] - 1
prevLogTerm = -1
if prevLogIndex >= 0 {
prevLogTerm = log[prevLogIndex].term
}
sendTerm = currentTerm
// NOTE: if length(entries) == 0, you may want to check that we
// haven't sent this peer an AppendEntries recently. If we
// have, just return.
// NOTE: if the RPC fails, stop processing for this peer, but
// trigger sending AppendEntries again immediately.
term, conflictIndex, conflictTerm, success
= peer.AppendEntries(sendTerm, me, prevLogIndex, prevLogTerm, entries, commitIndex)
if term > currentTerm {
currentTerm = term
state = FOLLOWER
votedFor = -1
nextIndex = map [..] -> length(log)
matchIndex = map [..] -> -1
}
if currentTerm != sendTerm {
return
}
if !success {
nextIndex[peer] = conflictIndex
if conflictTerm != -1 {
ourLastInConflictTerm = -1
for i from prevLogIndex to 0 (inclusive) {
if log[i].term == conflictTerm {
ourLastInConflictTerm = i
break
} else if log[i].term < conflictTerm {
break
}
}
if ourLastInConflictTerm != -1 {
nextIndex[peer] = ourLastInConflictTerm + 1
}
}
// TODO: Trigger sending AppendEntries again immediately
return
}
matchIndex[peer] = prevLogIndex + length(entries)
nextIndex[peer] = matchIndex[peer] + 1
for n from length(log)-1 to commitIndex {
if log[n].term != currentTerm {
break
}
replicas = 0
for each peer {
if matchIndex[peer] >= n {
replicas += 1
}
}
if replicas > #peers/2 {
commitIndex = n
break
}
}
}
}
OnStart(command)
-> (accepted, willCommitAt)
{
if state != LEADER {
return (false, -1)
}
log = log ++ [(currentTerm, command)]
nextIndex[me] = length(log)
matchIndex[me] = length(log) - 1
// TODO: persist Raft state
// TODO: trigger sending of AppendEntries
return true, length(log) - 1
}
@AlexLeung

This comment has been minimized.

Copy link

commented May 19, 2018

This implementation is incorrect. Quoting paragraph 3 from page 6 of the Raft paper

While waiting for votes, a candidate may receive an
AppendEntries RPC from another server claiming to be
leader. If the leader’s term (included in its RPC) is at least
as large as the candidate’s current term, then the candidate
recognizes the leader as legitimate and returns to follower
state. If the term in the RPC is smaller than the candidate’s
current term, then the candidate rejects the RPC and continues
in candidate state

You never have this check in the AppendEntries handler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.