Skip to content

Instantly share code, notes, and snippets.

@YouCanKeepSilence
Last active December 21, 2020 09:23
Show Gist options
  • Save YouCanKeepSilence/9bc480d8e596229b66ee4dad4d02c42b to your computer and use it in GitHub Desktop.
Save YouCanKeepSilence/9bc480d8e596229b66ee4dad4d02c42b to your computer and use it in GitHub Desktop.
Consumer broker with task delegation to workers on Node.js cluster
/*
Example of broker with worker cluster, you need this kind of architecture
when you have some gateway (API for example) with large amount of tasks (CPU tasks for example XML-string creation)
and want to delegate this tasks to workers (to improve speed of processing) without any additional brokers (kafka) usage.
*/
const cluster = require('cluster');
const express = require('express');
const http = require('http');
// Enable to test worker restore after dying
const ENABLE_RANDOM_WORKER_DYING = false;
let freeWorkers = [];
function createServer() {
const app = express();
const router = express.Router({});
router.post('/test/:id', (req, res) => {console.log(`${cluster.worker.id}: Worker got message: ${req.params.id}`); res.status(200).send()});
app.use('/', router);
const server = http.createServer(app);
server.listen(3003);
}
async function brokerLoop() {
freeWorkers = Object.keys(cluster.workers);
while (true) {
// Just to keep event loop breathe, else we will never get 'message' event from worker
await new Promise(resolve => setTimeout(resolve, 100));
const waitTimeTask = Math.random() * 1000;
const freeWorkerId = freeWorkers.pop();
if (freeWorkerId == null) {
// No free worker, wait for next iteration
continue;
}
cluster.workers[freeWorkerId].send({wait: waitTimeTask});
cluster.workers[freeWorkerId].once('message', ({workerId, wait}) => {
console.log(`Done task: ${wait} on worker: ${freeWorkerId}`);
freeWorkers.push(freeWorkerId);
});
console.log(`Task ${waitTimeTask} sent to worker: ${freeWorkerId}/${cluster.workers[freeWorkerId].process.pid}`);
}
}
if (cluster.isMaster) {
for (let id = 1; id <= 4; id++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died ${code}. Worker: ${worker.id}`);
// Restore worker after dying
const idx = freeWorkers.indexOf(worker.id);
const _worker = cluster.fork();
freeWorkers.splice(idx, 1);
freeWorkers.push(_worker.id);
});
brokerLoop().finally(() => console.log('Finished brokerLoop'));
} else {
// Node.js share TCP connection on port, so only one of workers will receive a http message
createServer();
console.log(`Worker started with pid: ${process.pid} and id: ${cluster.worker.id}`);
process.on('message', async ({wait}) => {
await new Promise(res => setTimeout(res, wait));
process.send({wait: wait, workerId: cluster.worker.id});
console.log(`${process.pid}/${cluster.worker.id} Sent result to task ${wait}.`);
if (ENABLE_RANDOM_WORKER_DYING) {
// Randomly kill workers to test rise after die (but it will broke any connection on worker)
// Exclude worker with id, just to test that it will live
if ((+cluster.worker.id !== 1) && (Math.random() < 0.5)) {
console.log(`Worker ${process.pid}/${cluster.worker.id} will die`);
process.exit(0);
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment