Skip to content

Instantly share code, notes, and snippets.

@jonshipman
Created October 25, 2022 13:28
Show Gist options
  • Save jonshipman/abe627c687a46e7f5ea4b36bb919666c to your computer and use it in GitHub Desktop.
Save jonshipman/abe627c687a46e7f5ea4b36bb919666c to your computer and use it in GitHub Desktop.
NodeJS Worker Forking - Allows workers to execute a command once regardless of process. Primary process keeps track of open "slots" and broadcasts an open PID when getting execution request (through runonce). If the PID matches that process, the callback executes.
import EventEmitter from 'node:events';
import cluster from 'cluster';
import { v4 as uuidv4 } from 'uuid';
import CONFIG, { isDevelopment } from '../configuration.js';
class Worker extends EventEmitter {
variables = {};
workers = {};
started = false;
get workerslist() {
return Object.values(this.workers);
}
get isPrimary() {
return cluster.isPrimary;
}
getId() {
return cluster.worker?.id || '0';
}
getPid() {
return process.pid;
}
getUnique() {
return uuidv4();
}
assignToWorker(pid, activity) {
if (this.variables[activity]) {
// Error if the activity has already been assigned.
throw new Error(`${activity} has been assigned`);
}
this.variables[activity] = pid;
const worker = this.workerslist.find((w) => w.pid === pid);
if (worker) {
this.workers[worker.id].busy = true;
this.workers[worker.id].activities.push(activity);
}
}
removeFromWorker(pid, activity) {
delete this.variables[activity];
const worker = this.workerslist.find((w) => w.pid === pid);
this.workers[worker.id].activities = this.workers[
worker.id
].activities.filter((a) => a !== activity);
this.workers[worker.id].busy =
this.workers[worker.id].activities.length > 0;
}
removeWorker(pid) {
const workers = this.workerslist;
const removed = workers.find((w) => w.pid === pid);
if (removed) {
delete this.workers[removed.id];
}
for (const [key, value] of Object.entries(this.variables)) {
if (value === pid && 0 === key.indexOf('__internal_once_')) {
delete this.variables[key];
}
}
}
getFreeWorker() {
let free;
if (!this.started && !this.workerslist.length) {
free = { ...process, activities: [], busy: false, id: 0 };
this.workers[0] = free;
return free;
}
const workers = this.workerslist;
free = workers.find((w) => !w.busy);
if (!free) {
free = workers.sort((a, z) => {
return a.activities.length > z.activities.length ? 1 : -1;
})[0];
}
return free;
}
start() {
this.started = true;
if (cluster.isPrimary) {
console.log(
'Primary cluster setting up',
CONFIG.WEB_CONCURRENCY,
'workers...'
);
cluster.on('online', (worker) => {
console.log('Worker', worker.process.pid, 'is online');
});
cluster.on('fork', (worker) => {
this.workers[worker.id] = {
activities: [],
busy: false,
id: worker.id,
pid: worker.process.pid,
};
});
cluster.on('message', async (_worker, rootmessage) => {
this.primaryMessager(rootmessage);
});
cluster.on('exit', (worker, code, signal) => {
console.log(
'Worker',
worker.process.pid,
'died with code',
code,
', and signal :',
signal
);
this.removeWorker(worker.process.pid);
if (!isDevelopment) {
console.log('Starting a new worker');
cluster.fork();
} else {
console.log(
'** In development, allowing crash without fork **'
);
process.exit(1);
}
});
for (let i = 0; i < CONFIG.WEB_CONCURRENCY; i++) {
cluster.fork();
}
this.emit('primarythread');
} else {
process.on('message', (message) => {
let topic;
try {
topic = message.topic;
} catch (_e) {
// noop.
}
if (message.value) {
this.variables = message.value;
}
this.emit(topic, this.variables);
});
this.emit('workerthread');
}
return this;
}
primaryMessager(rootmessage) {
let topic;
const { messageId, _pid, ...message } = rootmessage;
try {
topic = message.topic;
} catch (_e) {
// noop.
}
if ('UPDATE' === topic && 'object' === typeof message.value) {
this.variables = { ...this.variables, ...message.value };
}
// Adds a busy signal to the workers object.
if ('UPDATEPID' === topic && 'object' === typeof message.value) {
const free = this.getFreeWorker();
for (const [field, value] of Object.entries(message.value)) {
if (value === free.pid) {
try {
this.assignToWorker(free.pid, field);
} catch (_e) {
// noop.
}
}
}
}
// Removes the busy signal from the workers object.
if ('FREEPID' === topic && 'object' === typeof message.value) {
for (const [field, value] of Object.entries(message.value)) {
this.removeFromWorker(value, field);
}
}
if (this.started) {
this.messageWorkers({
topic: 'PONG-' + messageId,
value: this.variables,
});
}
}
messageWorkers(m) {
for (const id in cluster.workers) {
if (cluster.workers[id]) {
cluster.workers[id].send(m);
}
}
}
message(m) {
return new Promise((resolve) => {
const uniq = this.getUnique();
const msg = { ...m, messageId: uniq };
setTimeout(() => {
if (!this.started) {
this.primaryMessager(msg);
resolve();
} else if (cluster.isWorker) {
process.send(msg);
this.once('PONG-' + uniq, resolve);
} else {
this.messageWorkers(msg);
this.once('PONG-' + uniq, resolve);
}
}, 0);
});
}
ping() {
return this.message({ topic: 'PING' });
}
async get(key) {
await this.message({ topic: 'GET' });
if (this.variables[key]) {
return this.variables[key];
}
return undefined;
}
set(key, value) {
return this.message({ topic: 'UPDATE', value: { [key]: value } });
}
async updatePid(key) {
try {
await this.message({
topic: 'UPDATEPID',
value: { [key]: this.getPid() },
});
} catch (_e) {
// noop.
}
return this.variables[key];
}
freePid(key) {
return this.message({
topic: 'FREEPID',
value: { [key]: this.getPid() },
});
}
/**
* Introduces a delay to allow the messages to propagate to the processes.
*/
delay() {
return this.ping();
}
/**
* Only runs the callback for one session.
* Useful in cron jobs.
*
* @param {string} key Identifying string.
* @param {Function} callback Callback (returns a promise).
*/
async runonce(key, callback) {
if (cluster.isWorker || !this.started) {
let running;
// Attempt to update the PID, will return the pid assigned.
try {
running = await this.updatePid('__internal_once_' + key);
} catch (_e) {
// noop.
}
// If running is not assigned to our PID, bail.
if (running !== this.getPid()) {
return;
}
// We run the callback provided.
try {
await callback();
} finally {
await new Promise((r) => setTimeout(r, 7000));
// Finally, we free the PID from the prime process.
await this.freePid('__internal_once_' + key);
}
}
}
}
const MainWorker = new Worker();
export default MainWorker;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment