Skip to content

Instantly share code, notes, and snippets.

@talaikis
Created July 25, 2021 19:34
Show Gist options
  • Save talaikis/baab913636737250850438ddb616fb5d to your computer and use it in GitHub Desktop.
Save talaikis/baab913636737250850438ddb616fb5d to your computer and use it in GitHub Desktop.
const child = require('child_process')
const { cpus } = require('os')
const transform = require('stream-transform')
class Pool {
constructor (file, maxPool, done) {
this.pool = []
this.active = []
this.waiting = []
this.maxPool = maxPool
const releaseWorker = (function (worker) {
this.active = this.active.filter((w) => worker !== w)
this.pool.push(worker)
if (this.waiting.length > 0) {
this.assignWork(this.waiting.shift())
}
}).bind(this)
for (let i = 0; i < maxPool; i++) {
const worker = child.fork(file)
worker.on('message', (...params) => {
done(...params)
releaseWorker(worker)
})
this.pool.push(worker)
}
}
assignWork (data) {
if (this.active.length >= this.maxPool) {
this.waiting.push(data)
}
if (this.pool.length > 0) {
const worker = this.pool.pop()
worker.send(data)
this.active.push(worker)
}
}
}
const JobQueue = {}
const Pooler = new Pool(join(__dirname, 'workers', 'work.js'), cpus().length, (msg) => {
const queue = [...JobQueue[msg.event]]
JobQueue[msg.event] = null
queue.map((cb) => cb(msg.value))
})
const jobBatch = (record, done) => {
if (JobQueue[record[0]]) {
JobQueue[record[0]].push(done)
} else {
JobQueue[record[0]] = [done]
Pooler.assignWork({ record, event: record[0] })
}
}
// pass read stream or something similar
(async (readStream) => {
const parser = parse({
delimiter: ','
})
const transformer = transform((record, callback) => {
jobBatch(record, callback)
}, {
parallel: cpus().length
})
readStream.pipe(parser).pipe(transformer).pipe(process.stdout)
})()
// workers/work.js:
const doWork = async (record) => {
}
process.on('message', async ({ record, event }) => {
process.send({ value: await doWork(record), event })
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment