Skip to content

Instantly share code, notes, and snippets.

@slonka
Created November 28, 2018 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slonka/1f02a9b403891f624dffa2a121927d1c to your computer and use it in GitHub Desktop.
Save slonka/1f02a9b403891f624dffa2a121927d1c to your computer and use it in GitHub Desktop.
const http = require('http');
const _ = require('koa-route');
const Koa = require('koa');
const bodyParser = require('koa-bodyparser');
const request = require('request-promise-native');
const environment = process.env.ENVIRONMENT;
const peersPath = environment === 'test' ? 'peers-test' : 'peers-prod';
const {
peers,
ipPortToName
} = require(`./${peersPath}`);
const {
getPeersWithoutMe,
} = require('./peers-utils');
const app = new Koa();
app.use(bodyParser());
const HOST = process.env.HOST;
const PORT = process.env.PORT;
const nodeId = `${HOST}:${PORT}`;
const roles = {
follower: 'follower',
leader: 'leader',
candidate: 'candidate',
};
const REQUEST_TIMEOUT = 500;
const HEARTBEAT_INTERVAL = 3000;
const LEADER_TIMEOUT = 4000;
const LEADER_TIMEOUT_SPRED = 200;
let role = 'follower';
let lastHeartbeat = null;
let votesGranted = 0;
let currentTerm = 0;
let votedFor = null;
let leaderTimeout;
async function requestVoteFromPeers() {
console.log('requesting vote from peers');
const peersWithoutMe = getPeersWithoutMe(peers, nodeId);
votedFor = nodeId;
const responses = await Promise.all(peersWithoutMe.map(p => request.post({
url: `http://${p}/raft/request-vote`,
json: {
term: currentTerm,
candidateId: nodeId
},
timeout: REQUEST_TIMEOUT,
resolveWithFullResponse: true
})).map(p => p.catch(e => e)));
const positiveVotes = responses.map((r, index) => {
if (r.statusCode === 200) {
return r.body.voteGranted === true && r.body.term === currentTerm;
} else {
console.log('got not valid response from', ipPortToName[peers[index]]);
return false;
}
}).reduce((prev, current) => prev + (current === true ? 1 : 0), 1);
votesGranted = positiveVotes;
return positiveVotes;
}
async function sendHeartBeat() {
if (role === roles.leader) {
const peersWithoutMe = getPeersWithoutMe(peers, nodeId);
const responses = await Promise.all(peersWithoutMe.map(p => request.post({
url: `http://${p}/raft/append-entries`,
json: {
term: currentTerm,
},
timeout: REQUEST_TIMEOUT,
resolveWithFullResponse: true
})).map(p => p.catch(e => e)));
// am I still the leader?
const stillLeader = responses.every(r => {
if (r.statusCode === 200) {
return r.body.term === currentTerm;
} else if (r.statusCode !== 200) {
return true;
} else {
return false;
}
});
if (stillLeader) {
setTimeout(sendHeartBeat, HEARTBEAT_INTERVAL);
} else {
console.log(nodeId, 'lost leadership');
}
}
}
function checkLeaderTimedOut() {
const now = new Date().getTime();
const diff = Math.abs(now - lastHeartbeat);
return diff > LEADER_TIMEOUT;
}
function resetLeaderTimeout() {
clearTimeout(leaderTimeout);
leaderTimeout = setTimeout(startLeaderTimeout, LEADER_TIMEOUT + (LEADER_TIMEOUT_SPRED * Math.random()));
}
async function startLeaderTimeout() {
if ((role === roles.follower || role === roles.candidate) && votedFor === null && checkLeaderTimedOut()) {
currentTerm += 1;
role = roles.candidate;
const positiveVotes = await requestVoteFromPeers();
if (votedFor === nodeId && positiveVotes > peers.length / 2) {
role = roles.leader;
console.log(nodeId, 'became a leader');
currentTerm += 1;
await sendHeartBeat();
} else {
console.log('startLeaderTimeout: retrying');
resetLeaderTimeout();
}
votedFor = null;
}
}
leaderTimeout = setTimeout(startLeaderTimeout, LEADER_TIMEOUT + (LEADER_TIMEOUT_SPRED * Math.random()));
function state(ctx) {
ctx.response.set('Access-Control-Allow-Origin', '*');
ctx.body = {
currentTerm,
lastHeartbeat,
peers: getPeersWithoutMe(peers, nodeId),
role,
nodeId,
votedFor,
votesGranted,
}
}
function appendEntries(ctx) {
lastHeartbeat = new Date().getTime();
const messageTerm = parseInt(ctx.request.body.term, 10);
console.log(new Date(), 'got heartbeat from', ipPortToName[ctx.ip] || ctx.ip, 'with term', messageTerm);
if (messageTerm >= currentTerm) {
currentTerm = messageTerm;
role = roles.follower;
votesGranted = null;
}
resetLeaderTimeout();
votedFor = null;
votesGranted = 0;
ctx.body = {
term: currentTerm,
"success": true
}
}
function requestVote(ctx) {
const candidateId = ctx.request.body.candidateId;
const term = ctx.request.body.term;
console.log('got request to vote for', ipPortToName[candidateId] || candidateId, 'term', term, 'currentTerm', currentTerm, 'votedFor', votedFor);
if (role === roles.leader) {
ctx.response.body = {
term: currentTerm,
voteGranted: false
};
} else if (term === currentTerm && votedFor !== null) {
ctx.response.body = {
term: currentTerm,
voteGranted: false
};
} else if (term > currentTerm) {
console.log('voting for', ipPortToName[candidateId] || candidateId);
votedFor = candidateId;
currentTerm = term;
role = roles.follower;
votesGranted = null;
resetLeaderTimeout();
ctx.response.body = {
term: currentTerm,
voteGranted: true
};
currentTerm = term;
} else {
ctx.response.body = {
term: currentTerm,
voteGranted: false
};
}
}
app.use(_.get('/raft/state', state));
app.use(_.post('/raft/append-entries', appendEntries));
app.use(_.post('/raft/request-vote', requestVote));
http.createServer(app.callback())
.listen(PORT, HOST);
console.log(`listening on port http://${HOST}:${PORT}`);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment