Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
// Please read the following posts to understand this code:
// - "How Paxos works"
// http://rystsov.info/2015/09/16/how-paxos-works.html
// - "Read write quorums in Paxos"
// http://rystsov.info/2015/12/30/read-write-quorums.html
// - "Best of both worlds: Raft's joint consensus + Single Decree Paxos"
// http://rystsov.info/2016/01/05/raft-paxos.html
// 1. Leader Election (LE)
// LE isn't an essential part of Paxos. Paxos works fine even if every
// next clent's request is routed to a different proposer. However if
// all requests are routed to the same node then we can declare
// this node as a Leader and skip the prepare request (NewEpochMsg).
// We can deligate the task of routing of client's requests to the external
// layer like RingPop or Orleans. Even if something goes wrong we
// can be sure that the consistency holds becase our algorithm "falls back"
// to the original Paxos
// 2. Cluster membership change
// The switch of a proposer to the transient mode is done by turning it off and
// starting a new proposer with a new configuration. It should be done
// for each proposer one by one so the availability isn't affected.
class Proposer {
constructor(cache, acceptors, ignore, time, quorum) {
this.cache = cache; // keeps the current value of the key/value pair
// for which this proposer is leader
this.acceptors = acceptors;
this.time = time; // generator of "ballot" numbers
this.quorum = quorum; // stores read & write quorum sizes
this.ignore = ignore; // list of acceptors to ignore (on the read phase)
}
changeQuery(key, change, query, due) {
if (!this.cache[key].isLeader) {
const err1 = this._becomeLeader(key, due);
if (err1) return err1;
}
const [state, err2] = change(this.cache[key].state);
this.cache[key].state = state;
const tick = this.cache[key].epoch.tick(); // BTW 'tick's include an ID of the 'parant' epoch
// so it's always possible to check if they are belong
// to the same epoch or not.
const resp = send(this.acceptors, msg=new Accept(key, tick, state), timeout=(due - now()));
const [ok, err3] = this._await(key, resp, x => x.isOk, this.quorum.write);
if (err3) return err3;
if (err2) return err2;
return query(state);
}
_becomeLeader(key, due) {
this.cache[key].epoch = this.time.newEpoch();
const tick = this.cache[key].epoch.tick();
const resp = send(this.acceptors, msg=new NewEpochMsg(key, tick), timeout=(due-now));
const [ok, err1] = this._await(
key, resp, x => x.isAccepted && !this.ignore.contains(x.acceptor), this.quorum.read
);
if (err1) return err1;
this.cache[key].state = ok.max(x => x.tick).state;
this.cache[key].isLeader = true;
return null;
}
_await(key, resp, filter, atLeast) {
const [ok, err] = resp.filter(x => filter(x)).atLeast(atLeast);
resp = resp.abort()
for (const x of resp.filter(x => x.isConflict)) {
this.time.fastforward(x.tick);
}
if (err && err.hasExcluded) {
this.cache[key].isLeader = false;
return [null, new ConcurrentModificationError()];
} else if (err) {
return [null, new NetworkError()];
}
return [ok, null];
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment