Skip to content

Instantly share code, notes, and snippets.

@bellbind
Last active August 29, 2015 14:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bellbind/375d1de47661f4a343f4 to your computer and use it in GitHub Desktop.
Save bellbind/375d1de47661f4a343f4 to your computer and use it in GitHub Desktop.
[nodejs][es6]emulate Paxos network consensus model
// Paxos network consensus protocols
// Promise of ECMAScript6 used.
// Promise utilities
var Utils = {
moreThanHalf: function (promises) {
return new Promise(function (fulfill, reject) {
var success = 0;
var failure = 0;
var half = (promises.length / 2)|0;
var results = new Array(promises.length);
var errors = new Array(promises.length);
var onFulfill = function (val) {
success++;
results[this.i] = val;
if (success > half) {
fulfill(results);
}
};
var onReject = function (val) {
failure++;
errors[this.i] = val;
if (failure >= half) {
reject(errors);
}
};
promises.forEach(function (promise, i) {
var index = {i: i};
promise.then(onFulfill.bind(index), onReject.bind(index));
});
});
},
timeout: function (promise, msec) {
return new Promise(function (fulfill, reject) {
var tid = setTimeout(function () {
reject("timeout: " + msec);
}, msec);
promise.then(function (val) {
clearTimeout(tid);
fulfill(val);
}, function (val) {
clearTimeout(tid);
reject(val);
});
});
},
delay: function (msec) {
return new Promise(function (fulfill, reject) {
setTimeout(function () {
fulfill(msec);
}, msec);
});
},
};
// sequqnced updates for paxos message
var Proposal = function (sequence, update) {
this.sequence = sequence;
this.update = update;
};
Proposal.prototype.isLaterThan = function (b) {
return this.sequence > b.sequence;
};
Proposal.equals = function (a, b) {
if (!a && !b) return true;
if (!a && b || a && !b) return false;
return a.sequence === b.sequence && a.update === b.update;
};
// Acceptor: broken and delay is for designed failure
var Acceptor = function () {
this.proposals = [new Proposal(-1, "")];
this.next = null;
this.timeout = 1000;
this.timeoutId = null;
this.delay = 0;
this.broken = false;
};
Acceptor.prototype.propose = function (proposal) {
return Utils.delay(this.delay).then(function () {
if (this.broken) {
console.log("propose broken reject: " + proposal.update);
return Promise.reject("reject");
}
return new Promise(function (fulfill, reject) {
this.timeoutId = setTimeout(function () {
console.log("commit timeouted");
this.next = null;
}.bind(this), this.timeout);
var last = this.proposals[this.proposals.length - 1];
if (proposal.isLaterThan(last)) {
this.next = proposal;
fulfill(last);
console.log("propose accept: " + proposal.update);
} else {
reject("reject");
console.log("propose reject: " + proposal.update);
}
}.bind(this));
}.bind(this));
};
Acceptor.prototype.commit = function (proposal, preproposal) {
return Utils.delay(this.delay).then(function () {
if (this.broken) {
console.log("propose broken reject: " + proposal.update);
return Promise.reject("reject");
}
return new Promise(function (fulfill, reject) {
var last = this.proposals[this.proposals.length - 1];
if (Proposal.equals(this.next, proposal) &&
Proposal.equals(last, preproposal)) {
clearTimeout(this.timeoutId);
this.timeoutId = null;
this.proposals.push(this.next);
this.next = null;
fulfill(proposal);
console.log("commit accept: " + proposal.update);
} else {
reject("reject");
console.log("commit reject: " + proposal.update);
}
}.bind(this));
}.bind(this));
};
var Proposer = function () {
this.sequence = 0;
this.acceptors = [];
this.timeout = 1000;
};
Proposer.prototype.update = function (update) {
var proposal = new Proposal(++this.sequence, update);
return Utils.timeout(
Utils.moreThanHalf(this.acceptors.map(function (acceptor) {
return acceptor.propose(proposal);
})).then(function (preproposals) {
var preproposal = preproposals.reduce(function (prev, cur) {
return (cur && (!prev || cur.isLaterThan(prev))) ? cur : prev;
}, null);
return Utils.moreThanHalf(this.acceptors.map(function (acceptor) {
return acceptor.commit(proposal, preproposal);
}));
}.bind(this)).then(function () {
// from here, acccepted watching is in Learner role
return proposal;
}), this.timeout);
};
// main: single proposer case
var main = function () {
var proposer = new Proposer();
for (var i = 0; i < 15; i++) {
var acceptor = new Acceptor();
acceptor.delay = i * 50;
//acceptor.delay = i * 20;
proposer.acceptors.push(acceptor);
}
proposer.update("abc").then(function (update) {
console.log(update);
}, function () {
console.log("rejected: abc");
}).then(function () {
for (var i = 0; i < 15; i++) {
if (i % 5 == 0) proposer.acceptors[i].broken = true;
}
return proposer.update("def");
}).then(function (update) {
console.log(update);
}, function () {
console.log("rejected: def");
}).then(function () {
return proposer.update("ghi");
}).then(function (update) {
console.log(update);
}, function () {
console.log("rejected: ghi");
}).catch(function (err) {
console.log(err);
});
};
main();
@bellbind
Copy link
Author

run on node-0.11.13 (Promise enabled by default)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment