Skip to content

Instantly share code, notes, and snippets.

@IronSavior
Last active September 18, 2017 19:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IronSavior/0fdd16ee060ddbd04bc156a7eb0eec89 to your computer and use it in GitHub Desktop.
Save IronSavior/0fdd16ee060ddbd04bc156a7eb0eec89 to your computer and use it in GitHub Desktop.
A through2-compatible Transform stream which processes chunks with limited (or unlimited) concurrency
"use strict";
const {Transform} = require('readable-stream');
const PRIVATE = Symbol('ConcurrentTransform private state');
const NOOP_TRANSFORM = (chunk, _, done) => done(null, chunk);
const NOOP_FLUSH = done => done();
// Transform stream which processes chunks with limited (or unlimited) concurrency
class ConcurrentTransform extends Transform {
// @param opts {Object} Stream options. Set concurrency limit with "concurrent" (default concurrency: 1)
constructor( opts = {} ){
opts = Object.assign({concurrent: 1}, opts);
const limit = opts.concurrent === Infinity ? Infinity : parseInt(opts.concurrent);
delete opts.concurrent;
if( !limit || isNaN(limit) || limit < 1 ) throw new Error(`Concurrent option must be integer > 0`);
const user_transform = opts.transform || NOOP_TRANSFORM;
delete opts.transform;
const user_flush = opts.flush || NOOP_FLUSH;
delete opts.flush;
super(opts);
this[PRIVATE] = {};
this[PRIVATE].running = 0;
this[PRIVATE].limit = limit;
this[PRIVATE].user_transform = user_transform.bind(this);
this[PRIVATE].start_transform = start_transform.bind(this);
this[PRIVATE].user_flush = user_flush.bind(this);
}
_transform( chunk, encoding, transform_started ){
if( this[PRIVATE].running >= this[PRIVATE].limit ) return this[PRIVATE].queued_transform = arguments;
this[PRIVATE].start_transform(chunk, encoding, transform_started);
}
_flush( flush_cb ){
this[PRIVATE].flush_cb = flush_cb;
if( !this.running ) final_flush.call(this);
}
// @returns {Number} Count of currently running tasks
get running(){
return this[PRIVATE].running;
}
// @returns {Number} Maximum number of tasks allowed to run concurrently
get concurrent(){
return this[PRIVATE].limit;
}
}
// Finalize stream state (called once all transforms have completed)
function final_flush(){
const flush_cb = this[PRIVATE].flush_cb;
delete this[PRIVATE].flush_cb;
this[PRIVATE].user_flush(flush_cb);
}
// Manage lifecycle of chunk transform invocation
function start_transform( chunk, encoding, transform_started ){
const priv = this[PRIVATE];
priv.running++;
transform_started();
return priv.user_transform(chunk, encoding, (err, result) => {
priv.running--;
if( err ) return this.emit('error', err);
if( result !== undefined ) this.push(result);
if( priv.queued_transform ){
const [next_chunk, next_encoding, next_transform_started] = priv.queued_transform;
delete priv.queued_transform;
priv.start_transform(next_chunk, next_encoding, next_transform_started);
}
else if( priv.flush_cb && !priv.running ) final_flush.call(this);
});
}
// Create a stream via through2()-compatible interface
ConcurrentTransform.through = (...args) => {
let opts;
if( typeof args[0] === 'function' ) opts = {transform: args[0], flush: args[1]};
else opts = Object.assign({transform: args[1], flush: args[2]}, args[0]);
return new ConcurrentTransform(opts);
};
// Create a stream via through2::obj()-compatible interface
ConcurrentTransform.through.obj = (...args) => {
const opts = {objectMode: true};
if( typeof args[0] !== 'function' ) Object.assign(opts, args.shift());
return ConcurrentTransform.through(opts, ...args);
};
module.exports = ConcurrentTransform;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment