Skip to content

Instantly share code, notes, and snippets.

@SydneyUni-Jim
Last active March 15, 2024 03:37
Show Gist options
  • Save SydneyUni-Jim/7168b3190ca49bc9a0f5fd90ee84ad13 to your computer and use it in GitHub Desktop.
Save SydneyUni-Jim/7168b3190ca49bc9a0f5fd90ee84ad13 to your computer and use it in GitHub Desktop.
Node.js Transform that executes in parallel with an async transformer
import { strict as assert } from 'node:assert'
import { Transform } from 'node:stream'
import os from 'node:os'
export class ParallelAsyncTransform extends Transform {
constructor(maxParallel = os.availableParallelism(), options = {}) {
super({
highWaterMark: maxParallel,
objectMode: true,
...options
})
this._buffer = []
this._flushed = false
this._maxParallel = maxParallel
this._onDrainedCallback = undefined
this._popCount = 0
this._writeCount = 0
this._signal = options?.signal
}
async _asyncTransform(chunk, encoding, signal){
throw new Error('_asyncTransform not implemented')
}
_read(size) {
// Nothing to do here. _write drives objects into the readable side of this duplex.
}
_write(chunk, encoding, callback) {
let writeCount = this._writeCount++
this._asyncTransform(chunk, encoding, this._signal)
.then(data => {
if (this.destroyed) return
this._buffer.push(data)
this._drain()
})
.catch(err => this.destroy(err))
if (this._writeCount - this._popCount < this._maxParallel) {
callback()
} else {
assert(!this._onDrainedCallback, '_onDrainedCallback is still defined')
this._onDrainedCallback = callback
}
}
_flush(callback) {
this._flushed = true
this._onDrainedCallback = callback
this._drain()
}
_drain() {
while (this._buffer.length) {
const data = this._buffer.pop()
this._popCount++
if (data == null) continue
if (!this.push(data)) break
}
if (this._drained && this._onDrainedCallback) {
const callback = this._onDrainedCallback
this._onDrainedCallback = undefined
callback()
}
}
get _drained() {
const transformsInFlight = this._writeCount - this._popCount
return this._flushed ? !transformsInFlight : transformsInFlight < this._maxParallel
}
}
@SydneyUni-Jim
Copy link
Author

This is a "modernised" version of https://github.com/mafintosh/parallel-transform.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment