Skip to content

Instantly share code, notes, and snippets.

@jirevwe
Last active June 12, 2021 12:45
Show Gist options
  • Save jirevwe/e870ed84512377973cb7618aa298798b to your computer and use it in GitHub Desktop.
Save jirevwe/e870ed84512377973cb7618aa298798b to your computer and use it in GitHub Desktop.
NodeJs Worker Pool with Callback
const sleep = (ms = 1000) => new Promise((resolve) => setTimeout(resolve, ms));
async function start() {
const execFile = join(__dirname, './worker/index.js');
const pool = new ThreadPool<number, number>({
execFile,
jobCallback,
timeOutCallback,
debug: true,
threadTimeout: 3
});
pool.run(0);
pool.run(1);
function timeOutCallback() {
process.exit(0);
}
async function jobCallback(payload: number) {
console.log(payload);
await sleep(2000);
if (payload < 1_000_000) {
pool.run(payload * 2);
pool.run(payload * 3);
}
}
}
start();
function processThisShit(payload: WorkerRequest) {
for (const key in payload) {
payload[key] = payload[key] + ' — (processed)';
}
parentPort.postMessage(payload);
}
parentPort.on('message', processThisShit);
import { Worker as Thread } from 'worker_threads';
import os from 'os';
/**
* Creates a thread pool
*
* @param threadCount the number of threads to spawn
* @param execFile the file containing the worker function
* @param jobCallback the result callback (executed on the main thread)
* @param tasks initial tasks that will be seeded on the queue
*/
export interface ThreadPoolOptions<Task, Result> {
jobCallback: (result: Result) => {};
timeOutCallback: () => void;
execFile: string;
threadTimeout?: number;
threadCount?: number;
tasks?: Task[];
debug?: boolean;
}
export class ThreadPool<Task, Result> {
private threads: Thread[] = [];
private idle: number[] = [];
private queue: Task[] = [];
private timeout: NodeJS.Timeout;
constructor(private readonly options: ThreadPoolOptions<Task, Result>) {
const { tasks, debug, execFile, threadCount } = options;
if (!debug) {
options.debug = false;
}
options.threadTimeout = options.threadTimeout
? options.threadTimeout * 1000
: 10 * 1000;
if (!threadCount) {
options.threadCount = os.cpus().length * 2;
}
if (tasks) {
this.queue.push(...tasks);
}
for (let i = 0; i < options.threadCount; i++) {
const worker = new Thread(execFile);
this.threads.push(worker);
this.idle.push(worker.threadId);
this.registerCallbacks(worker);
}
}
/**
* Adds work to the pool and tries to execute it
*/
run(payload: Task) {
this.queue.push(payload);
this.runNext();
}
/**
* Picks the first task from queue and runs it
*/
private runNext() {
if (this.options.debug) {
console.log(`queue: ${this.queue.length}, idle: ${this.idle.length}`);
}
// the threads are idle and the queue is empty
if (this.idle.length === this.threads.length && this.queue.length === 0) {
this.timeout = setTimeout(
this.options.timeOutCallback,
this.options.threadTimeout
);
return;
}
if (this.idle.length === 0) return;
// we have more jobs, don't stop the threads
if (this.timeout) {
clearTimeout(this.timeout);
this.timeout = null;
}
// get the first idle worker
const worker = this.threads.find((it) => it.threadId === this.idle[0]);
// dequeue the first item
const payload = this.queue.shift();
if (!payload) return;
this.formatDebugMessage(payload, 'task');
// give the worker work to do
worker.postMessage(payload);
// remove the idle worker from the idle pool
this.idle.shift();
}
private formatDebugMessage(payload: Task | Result, type: 'task' | 'result') {
if (!this.options.debug) return;
const data =
typeof payload === 'object' ? JSON.stringify(payload) : payload;
console.log(
`${type} ${type === 'task' ? 'received' : 'sent'}, data: (${data})`
);
}
private registerCallbacks(worker: Thread) {
worker.on('message', (result: Result) => {
this.formatDebugMessage(result, 'result');
this.options.jobCallback(result);
this.idle.push(worker.threadId);
this.runNext();
});
worker.on('error', (error) => {
console.log(
`thread with thread id (${worker.threadId}) received an error event: ${error.message}`
);
this.idle.push(worker.threadId);
this.runNext();
});
worker.on('messageerror', (error) => {
console.log(
`thread with thread id (${worker.threadId}) received a messageerror event: (${error.message})`
);
});
worker.on('online', () => {
if (this.options.debug) {
console.log(`thread with id (${worker.threadId}) online`);
}
});
worker.on('exit', (code) => {
console.log(
`thread with thread id (${worker.threadId}) exited with error code (${code})`
);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment