Last active
December 21, 2020 09:23
-
-
Save YouCanKeepSilence/9bc480d8e596229b66ee4dad4d02c42b to your computer and use it in GitHub Desktop.
Consumer broker with task delegation to workers on Node.js cluster
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Example of broker with worker cluster, you need this kind of architecture | |
when you have some gateway (API for example) with large amount of tasks (CPU tasks for example XML-string creation) | |
and want to delegate this tasks to workers (to improve speed of processing) without any additional brokers (kafka) usage. | |
*/ | |
const cluster = require('cluster'); | |
const express = require('express'); | |
const http = require('http'); | |
// Enable to test worker restore after dying | |
const ENABLE_RANDOM_WORKER_DYING = false; | |
let freeWorkers = []; | |
function createServer() { | |
const app = express(); | |
const router = express.Router({}); | |
router.post('/test/:id', (req, res) => {console.log(`${cluster.worker.id}: Worker got message: ${req.params.id}`); res.status(200).send()}); | |
app.use('/', router); | |
const server = http.createServer(app); | |
server.listen(3003); | |
} | |
async function brokerLoop() { | |
freeWorkers = Object.keys(cluster.workers); | |
while (true) { | |
// Just to keep event loop breathe, else we will never get 'message' event from worker | |
await new Promise(resolve => setTimeout(resolve, 100)); | |
const waitTimeTask = Math.random() * 1000; | |
const freeWorkerId = freeWorkers.pop(); | |
if (freeWorkerId == null) { | |
// No free worker, wait for next iteration | |
continue; | |
} | |
cluster.workers[freeWorkerId].send({wait: waitTimeTask}); | |
cluster.workers[freeWorkerId].once('message', ({workerId, wait}) => { | |
console.log(`Done task: ${wait} on worker: ${freeWorkerId}`); | |
freeWorkers.push(freeWorkerId); | |
}); | |
console.log(`Task ${waitTimeTask} sent to worker: ${freeWorkerId}/${cluster.workers[freeWorkerId].process.pid}`); | |
} | |
} | |
if (cluster.isMaster) { | |
for (let id = 1; id <= 4; id++) { | |
cluster.fork(); | |
} | |
cluster.on('exit', (worker, code, signal) => { | |
console.log(`Worker ${worker.process.pid} died ${code}. Worker: ${worker.id}`); | |
// Restore worker after dying | |
const idx = freeWorkers.indexOf(worker.id); | |
const _worker = cluster.fork(); | |
freeWorkers.splice(idx, 1); | |
freeWorkers.push(_worker.id); | |
}); | |
brokerLoop().finally(() => console.log('Finished brokerLoop')); | |
} else { | |
// Node.js share TCP connection on port, so only one of workers will receive a http message | |
createServer(); | |
console.log(`Worker started with pid: ${process.pid} and id: ${cluster.worker.id}`); | |
process.on('message', async ({wait}) => { | |
await new Promise(res => setTimeout(res, wait)); | |
process.send({wait: wait, workerId: cluster.worker.id}); | |
console.log(`${process.pid}/${cluster.worker.id} Sent result to task ${wait}.`); | |
if (ENABLE_RANDOM_WORKER_DYING) { | |
// Randomly kill workers to test rise after die (but it will broke any connection on worker) | |
// Exclude worker with id, just to test that it will live | |
if ((+cluster.worker.id !== 1) && (Math.random() < 0.5)) { | |
console.log(`Worker ${process.pid}/${cluster.worker.id} will die`); | |
process.exit(0); | |
} | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment