Created
November 28, 2018 15:35
-
-
Save slonka/1f02a9b403891f624dffa2a121927d1c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const http = require('http'); | |
const _ = require('koa-route'); | |
const Koa = require('koa'); | |
const bodyParser = require('koa-bodyparser'); | |
const request = require('request-promise-native'); | |
const environment = process.env.ENVIRONMENT; | |
const peersPath = environment === 'test' ? 'peers-test' : 'peers-prod'; | |
const { | |
peers, | |
ipPortToName | |
} = require(`./${peersPath}`); | |
const { | |
getPeersWithoutMe, | |
} = require('./peers-utils'); | |
const app = new Koa(); | |
app.use(bodyParser()); | |
const HOST = process.env.HOST; | |
const PORT = process.env.PORT; | |
const nodeId = `${HOST}:${PORT}`; | |
const roles = { | |
follower: 'follower', | |
leader: 'leader', | |
candidate: 'candidate', | |
}; | |
const REQUEST_TIMEOUT = 500; | |
const HEARTBEAT_INTERVAL = 3000; | |
const LEADER_TIMEOUT = 4000; | |
const LEADER_TIMEOUT_SPRED = 200; | |
let role = 'follower'; | |
let lastHeartbeat = null; | |
let votesGranted = 0; | |
let currentTerm = 0; | |
let votedFor = null; | |
let leaderTimeout; | |
async function requestVoteFromPeers() { | |
console.log('requesting vote from peers'); | |
const peersWithoutMe = getPeersWithoutMe(peers, nodeId); | |
votedFor = nodeId; | |
const responses = await Promise.all(peersWithoutMe.map(p => request.post({ | |
url: `http://${p}/raft/request-vote`, | |
json: { | |
term: currentTerm, | |
candidateId: nodeId | |
}, | |
timeout: REQUEST_TIMEOUT, | |
resolveWithFullResponse: true | |
})).map(p => p.catch(e => e))); | |
const positiveVotes = responses.map((r, index) => { | |
if (r.statusCode === 200) { | |
return r.body.voteGranted === true && r.body.term === currentTerm; | |
} else { | |
console.log('got not valid response from', ipPortToName[peers[index]]); | |
return false; | |
} | |
}).reduce((prev, current) => prev + (current === true ? 1 : 0), 1); | |
votesGranted = positiveVotes; | |
return positiveVotes; | |
} | |
async function sendHeartBeat() { | |
if (role === roles.leader) { | |
const peersWithoutMe = getPeersWithoutMe(peers, nodeId); | |
const responses = await Promise.all(peersWithoutMe.map(p => request.post({ | |
url: `http://${p}/raft/append-entries`, | |
json: { | |
term: currentTerm, | |
}, | |
timeout: REQUEST_TIMEOUT, | |
resolveWithFullResponse: true | |
})).map(p => p.catch(e => e))); | |
// am I still the leader? | |
const stillLeader = responses.every(r => { | |
if (r.statusCode === 200) { | |
return r.body.term === currentTerm; | |
} else if (r.statusCode !== 200) { | |
return true; | |
} else { | |
return false; | |
} | |
}); | |
if (stillLeader) { | |
setTimeout(sendHeartBeat, HEARTBEAT_INTERVAL); | |
} else { | |
console.log(nodeId, 'lost leadership'); | |
} | |
} | |
} | |
function checkLeaderTimedOut() { | |
const now = new Date().getTime(); | |
const diff = Math.abs(now - lastHeartbeat); | |
return diff > LEADER_TIMEOUT; | |
} | |
function resetLeaderTimeout() { | |
clearTimeout(leaderTimeout); | |
leaderTimeout = setTimeout(startLeaderTimeout, LEADER_TIMEOUT + (LEADER_TIMEOUT_SPRED * Math.random())); | |
} | |
async function startLeaderTimeout() { | |
if ((role === roles.follower || role === roles.candidate) && votedFor === null && checkLeaderTimedOut()) { | |
currentTerm += 1; | |
role = roles.candidate; | |
const positiveVotes = await requestVoteFromPeers(); | |
if (votedFor === nodeId && positiveVotes > peers.length / 2) { | |
role = roles.leader; | |
console.log(nodeId, 'became a leader'); | |
currentTerm += 1; | |
await sendHeartBeat(); | |
} else { | |
console.log('startLeaderTimeout: retrying'); | |
resetLeaderTimeout(); | |
} | |
votedFor = null; | |
} | |
} | |
leaderTimeout = setTimeout(startLeaderTimeout, LEADER_TIMEOUT + (LEADER_TIMEOUT_SPRED * Math.random())); | |
function state(ctx) { | |
ctx.response.set('Access-Control-Allow-Origin', '*'); | |
ctx.body = { | |
currentTerm, | |
lastHeartbeat, | |
peers: getPeersWithoutMe(peers, nodeId), | |
role, | |
nodeId, | |
votedFor, | |
votesGranted, | |
} | |
} | |
function appendEntries(ctx) { | |
lastHeartbeat = new Date().getTime(); | |
const messageTerm = parseInt(ctx.request.body.term, 10); | |
console.log(new Date(), 'got heartbeat from', ipPortToName[ctx.ip] || ctx.ip, 'with term', messageTerm); | |
if (messageTerm >= currentTerm) { | |
currentTerm = messageTerm; | |
role = roles.follower; | |
votesGranted = null; | |
} | |
resetLeaderTimeout(); | |
votedFor = null; | |
votesGranted = 0; | |
ctx.body = { | |
term: currentTerm, | |
"success": true | |
} | |
} | |
function requestVote(ctx) { | |
const candidateId = ctx.request.body.candidateId; | |
const term = ctx.request.body.term; | |
console.log('got request to vote for', ipPortToName[candidateId] || candidateId, 'term', term, 'currentTerm', currentTerm, 'votedFor', votedFor); | |
if (role === roles.leader) { | |
ctx.response.body = { | |
term: currentTerm, | |
voteGranted: false | |
}; | |
} else if (term === currentTerm && votedFor !== null) { | |
ctx.response.body = { | |
term: currentTerm, | |
voteGranted: false | |
}; | |
} else if (term > currentTerm) { | |
console.log('voting for', ipPortToName[candidateId] || candidateId); | |
votedFor = candidateId; | |
currentTerm = term; | |
role = roles.follower; | |
votesGranted = null; | |
resetLeaderTimeout(); | |
ctx.response.body = { | |
term: currentTerm, | |
voteGranted: true | |
}; | |
currentTerm = term; | |
} else { | |
ctx.response.body = { | |
term: currentTerm, | |
voteGranted: false | |
}; | |
} | |
} | |
app.use(_.get('/raft/state', state)); | |
app.use(_.post('/raft/append-entries', appendEntries)); | |
app.use(_.post('/raft/request-vote', requestVote)); | |
http.createServer(app.callback()) | |
.listen(PORT, HOST); | |
console.log(`listening on port http://${HOST}:${PORT}`); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment