This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Please read the following posts to understand this code: | |
// - "How Paxos works" | |
// http://rystsov.info/2015/09/16/how-paxos-works.html | |
// - "Read write quorums in Paxos" | |
// http://rystsov.info/2015/12/30/read-write-quorums.html | |
// - "Best of both worlds: Raft's joint consensus + Single Decree Paxos" | |
// http://rystsov.info/2016/01/05/raft-paxos.html | |
// 1. Leader Election (LE) | |
// LE isn't an essential part of Paxos. Paxos works fine even if every | |
// next clent's request is routed to a different proposer. However if | |
// all requests are routed to the same node then we can declare | |
// this node as a Leader and skip the prepare request (NewEpochMsg). | |
// We can deligate the task of routing of client's requests to the external | |
// layer like RingPop or Orleans. Even if something goes wrong we | |
// can be sure that the consistency holds becase our algorithm "falls back" | |
// to the original Paxos | |
// 2. Cluster membership change | |
// The switch of a proposer to the transient mode is done by turning it off and | |
// starting a new proposer with a new configuration. It should be done | |
// for each proposer one by one so the availability isn't affected. | |
class Proposer { | |
constructor(cache, acceptors, ignore, time, quorum) { | |
this.cache = cache; // keeps the current value of the key/value pair | |
// for which this proposer is leader | |
this.acceptors = acceptors; | |
this.time = time; // generator of "ballot" numbers | |
this.quorum = quorum; // stores read & write quorum sizes | |
this.ignore = ignore; // list of acceptors to ignore (on the read phase) | |
} | |
changeQuery(key, change, query, due) { | |
if (!this.cache[key].isLeader) { | |
const err1 = this._becomeLeader(key, due); | |
if (err1) return err1; | |
} | |
const [state, err2] = change(this.cache[key].state); | |
this.cache[key].state = state; | |
const tick = this.cache[key].epoch.tick(); // BTW 'tick's include an ID of the 'parant' epoch | |
// so it's always possible to check if they are belong | |
// to the same epoch or not. | |
const resp = send(this.acceptors, msg=new Accept(key, tick, state), timeout=(due - now())); | |
const [ok, err3] = this._await(key, resp, x => x.isOk, this.quorum.write); | |
if (err3) return err3; | |
if (err2) return err2; | |
return query(state); | |
} | |
_becomeLeader(key, due) { | |
this.cache[key].epoch = this.time.newEpoch(); | |
const tick = this.cache[key].epoch.tick(); | |
const resp = send(this.acceptors, msg=new NewEpochMsg(key, tick), timeout=(due-now)); | |
const [ok, err1] = this._await( | |
key, resp, x => x.isAccepted && !this.ignore.contains(x.acceptor), this.quorum.read | |
); | |
if (err1) return err1; | |
this.cache[key].state = ok.max(x => x.tick).state; | |
this.cache[key].isLeader = true; | |
return null; | |
} | |
_await(key, resp, filter, atLeast) { | |
const [ok, err] = resp.filter(x => filter(x)).atLeast(atLeast); | |
resp = resp.abort() | |
for (const x of resp.filter(x => x.isConflict)) { | |
this.time.fastforward(x.tick); | |
} | |
if (err && err.hasExcluded) { | |
this.cache[key].isLeader = false; | |
return [null, new ConcurrentModificationError()]; | |
} else if (err) { | |
return [null, new NetworkError()]; | |
} | |
return [ok, null]; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment