Skip to content

Instantly share code, notes, and snippets.

@ardeshireshghi
Last active February 27, 2022 19:05
Show Gist options
  • Save ardeshireshghi/94c79513186d51b3294cb0daba6b8821 to your computer and use it in GitHub Desktop.
Save ardeshireshghi/94c79513186d51b3294cb0daba6b8821 to your computer and use it in GitHub Desktop.
const {
WorkerPool
} = require('./WorkerPool');
const pool = new WorkerPool({
size: 24,
taskPath: __dirname + '/test-worker.js'
});
const promises = [...Array(10000).keys()].map(() => {
return pool.run({ data: Math.round(Math.random() * 10000) + 1000000 });
});
(async () => {
const start = Date.now();
await Promise.all(promises);
console.log(
'It took %s seconds with %s workers',
(Date.now() - start) / 1000,
pool.size
);
await pool.terminate();
process.exit();
})();
const { isMainThread, parentPort } = require('worker_threads');
function calc(max) {
let j = 0;
for (var i = 0; i < max; i++) {
j += i;
}
return j;
}
if (!isMainThread) {
parentPort.on('message', (message) => {
parentPort.postMessage(calc(message.data));
});
}
import EventEmitter from 'events';
import { Worker } from 'worker_threads';
class CustomWorker extends Worker {
public isReady: boolean;
constructor(...args: ConstructorParameters<typeof Worker>) {
super(...args);
this.isReady = true;
}
run(workerData) {
this.isReady = false;
return new Promise((resolve, reject) => {
const onMessage = (message) => {
resolve(message);
this.removeListener('error', onError);
this.isReady = true;
this.emit('ready', this);
};
const onError = (err) => {
reject(err);
this.removeListener('message', onMessage);
};
this.once('error', onError);
this.once('message', onMessage);
this.postMessage(workerData);
});
}
}
export class WorkerPool extends EventEmitter {
public size: number;
public taskPath: string;
public workers: CustomWorker[];
public taskQueue: any[];
constructor({ size, taskPath }: { size: number; taskPath: string }) {
super();
this.size = size;
this.taskPath = taskPath;
this.taskQueue = [];
this.workers = this._createWorkers();
}
public async run(workerData: any) {
return new Promise((resolve, reject) => {
this.taskQueue.push({ workerData, resolve, reject });
this.executeTask();
});
}
public async terminate() {
this.workers = [];
this.removeAllListeners();
await Promise.all(this.workers.map((worker) => worker.terminate()));
}
private _createWorkers() {
return Array.from(Array(this.size).keys()).map(() => {
return this._createWorker();
});
}
private _createWorker() {
const worker = new CustomWorker(this.taskPath);
worker.on('ready', (theWorker) => {
this.runTask(theWorker);
});
worker.once('exit', (code) => {
if (code !== 0) {
worker.terminate();
this.workers[this.workers.indexOf(worker)] = new CustomWorker(
this.taskPath
);
}
});
return worker;
}
private async executeTask() {
const freeWorker = this.workers.find((worker) => worker.isReady);
if (freeWorker) {
this.runTask(freeWorker);
}
}
private runTask(worker) {
if (this.taskQueue.length > 0) {
const { workerData, resolve, reject } = this.taskQueue.shift();
if (workerData) {
return worker.run(workerData).then(resolve).catch(reject);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment