Skip to content

Instantly share code, notes, and snippets.

@MichaelKreil
Created March 17, 2022 13:55
Show Gist options
  • Save MichaelKreil/30efc1585f24e9da80640f2b75e923cc to your computer and use it in GitHub Desktop.
Save MichaelKreil/30efc1585f24e9da80640f2b75e923cc to your computer and use it in GitHub Desktop.
"use strict"
const cluster = require('cluster');
const os = require('os');
Array.prototype.forEachParallel = forEachParallel;
module.exports = {
simpleCluster,
}
/*
let items = [1,1,2,3,5]
await items.forEachParallel(async (item, index) => {
await doStuff(item)
}, 16)
*/
function forEachParallel() {
let callback, maxParallel = os.cpus().length;
switch (arguments.length) {
case 1: [ callback ] = arguments; break;
case 2: [ maxParallel, callback ] = arguments; break;
default:
throw Error('forEachParallel( [ maxParallel, ] callback)')
}
let list = this;
return new Promise((resolve, reject) => {
let running = 0, index = 0, finished = false;
queueMicrotask(next);
function next() {
if (finished) return;
if (running >= maxParallel) return;
if (index >= list.length) {
if (running === 0) {
finished = true;
resolve();
return
}
return
}
running++;
let currentIndex = index++;
callback(list[currentIndex], currentIndex)
.then(() => {
running--;
queueMicrotask(next)
})
.catch(err => {
finished = true;
reject(err);
})
if (running < maxParallel) queueMicrotask(next);
}
})
}
/*
Usage:
simpleCluster(startWorker => {
[1,1,2,3,5].forEachParallel(startWorker, 16)
},
(item, index) => {
// do the work
})
*/
function simpleCluster() {
let mainFunction, workerFunction, singleThread;
switch (arguments.length) {
case 2: [ mainFunction, workerFunction ] = arguments; break;
case 3: [ singleThread, mainFunction, workerFunction ] = arguments; break;
default:
throw Error('simpleCluster( [ singleThread, ] mainFunction, workerFunction )')
}
if (singleThread) return mainFunction(workerFunction);
if (cluster.isMaster) {
mainFunction(function (...args) {
return new Promise(res => {
let worker = cluster.fork();
worker.on('online', () => worker.send(args))
worker.on('message', response => res(response));
})
})
} else if (cluster.isWorker) {
process.on('message', async args => {
let response = await workerFunction(...args);
process.send(response, () => process.exit());
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment