Skip to content

Instantly share code, notes, and snippets.

@Zaggen
Last active May 26, 2022 16:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.
Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.
Node.js load balancer with
/*
* Author: Zaggen - 2017
* version: 0.1
* github: https://github.com/Zaggen
* Free to use and modify
* */
const httpProxy = require('http-proxy')
const http = require('http')
const proxy = httpProxy.createProxyServer({})
const Promise = require('bluebird')
// REDIS
const redis = require('redis')
const redisConfig = require('./config/local.js').redisConfig
const redisClient = redis.createClient(redisConfig)
const port = 1337 // Same as the one configured on nginx
Promise.promisifyAll(redis.RedisClient.prototype)
// Queue system vars/consts
const MAX_ATTEMPS = 5
const REGISTRY_CHECK_TIME = 10000
const MAX_QUEUE_SIZE = 20
const queue = []
let serverIndex = 0
let serversList
(async function init() {
serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
await validateRegistryAsync(serversList)
let lastRegistryCheck = (new Date()).getTime()
const server = http.createServer(async function(req, res) {
handleRequest(req, res, MAX_ATTEMPS)
})
async function handleRequest(req, res, retryAttemptsLeft) {
if(validationLock.isActive)
await validationLock.waitForRelease()
req.retryAttemptsLeft = retryAttemptsLeft
// If 60s has passed since the last request, we validate our registry
if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){
validationLock.acquire()
serversList = await validateRegistryAsync(await redisClient.lrangeAsync('server-registry', 0, 100))
lastRegistryCheck = (new Date()).getTime()
validationLock.release()
}
// If we have some servers, we process the request
if(serversList.length > 0){
serverIndex = (serverIndex + 1) % serversList.length
const target = serversList[serverIndex]
// console.log('balancing request to: ', target);
proxy.web(req, res, {target})
}
else {
queueRequest(req, res)
}
}
const validationLock = {
_isActive: false,
get isActive(){ return this._isActive},
set isActive(state){
this._isActive = state
if(state === false)
this._notifyRelease()
},
acquire(){
this.isActive = true
},
release(){
this.isActive = false
},
async waitForRelease(){
return new Promise((resolve)=> {
this._onRelease(resolve)
})
},
_notifyRelease(){
let cb
while(cb = this._listeners.shift()){
cb()
}
},
_listeners: [],
_onRelease(cb){
this._listeners.push(cb)
}
}
function queueRequest(req, res) {
const {retryAttemptsLeft} = req
// console.log(`queueRequest -> retryAttemptsLeft: ${retryAttemptsLeft}`)
if(retryAttemptsLeft > 0){
if(queue.length >= MAX_QUEUE_SIZE) {
const oldestRequest = queue.shift()
sendBadGateWay(oldestRequest.res)
}
queue.push({req, res, retryAttemptsLeft: retryAttemptsLeft - 1})
const delayMultiplier = (MAX_ATTEMPS + 1) - retryAttemptsLeft
scheduleRetry(queue, delayMultiplier)
}
else {
sendBadGateWay(res)
}
}
function sendBadGateWay(res) {
res.statusCode = 502
res.statusMessage = 'Bad Gateway'
res.end(`
<body style="text-align: center; padding: 10px;">
<h1>502 Server Error</h1>
<hr/>
<p>
The server encountered a temporary error and could not complete your request.
Please try again later and if the problem persists then contact support and explain in detail how and when
the error occurred.
</p>
<p>Thank you for your kind understanding.</p>
</body>
`)
}
server.on('upgrade', async function(req, socket, head) {
// console.log('socket connection')
serversList = await redisClient.lrangeAsync('server-registry', 0, 100)
if(serversList.length > 0){
serverIndex = (serverIndex + 1) % serversList.length
const target = serversList[serverIndex]
// console.log('balancing request to: ', target);
proxy.ws(req, socket, head, {target});
}
})
proxy.on('error', function (err, req, res) {
// This will force a registry check on the next request
lastRegistryCheck -= REGISTRY_CHECK_TIME
console.error('Error')
console.error(err)
if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft === MAX_ATTEMPS){
handleRequest(req, res, MAX_ATTEMPS - 1)
}
else if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){
queueRequest(req, res)
}
else
sendBadGateWay(res)
})
function scheduleRetry(queue, delayMultiplier) {
// console.log(`scheduleRetry`)
setTimeout(function() {
if(queue.length > 0){
const {req, res, retryAttemptsLeft} = queue.shift()
if(!res.socket.destroyed){
// console.log('retryAttemptsLeft', retryAttemptsLeft)
handleRequest(req, res, retryAttemptsLeft)
}
else {
// CONNECTION RESET BY CLIENT
res.end()
}
}
}, 200 * delayMultiplier)
}
async function validateRegistryAsync(serversList) {
const updatedServerList = []
for (let server of serversList) {
const isAlive = await testEndPoint(server)
if(!isAlive){
redisClient.lremAsync('server-registry', 0, server)
// console.log("REMOVED SERVER FROM REGISTRY")
}
else
updatedServerList.push(server)
}
return updatedServerList
}
async function testEndPoint(url) {
return new Promise((resolve)=> {
try {
http.get(url, ()=> {
resolve(true)
}).on('error', (err) => {
if(err.code === 'ECONNREFUSED')
resolve(false)
else
resolve(true)
});
} catch(err){
resolve(false)
}
})
}
server.listen(port, ()=> {
console.log(`Load balancer listening on port ${port}`)
})
})()
@Zaggen
Copy link
Author

Zaggen commented Mar 8, 2017

  • On Server start call redisClient.rpush.apply(redisClient, ['server-registry', serverUrl]) (Make sure is not already there or use a set instead)
  • On server shutdown call redisClient.lrem('server-registry', 0, serverUrl)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment