Skip to content

Instantly share code, notes, and snippets.

@mtth
Created February 14, 2015 20:39
Show Gist options
  • Save mtth/f7e076ad10646db855a0 to your computer and use it in GitHub Desktop.
Save mtth/f7e076ad10646db855a0 to your computer and use it in GitHub Desktop.
BatchReadable stream
var stream = require('stream'),
util = require('util');
/**
* Batch loaded readable stream.
*
* @param {Function} `fn(cb)`, where `cb(err, iter)`. Function used to feed
* the stream. `iter` should either be an iterator or `null` to signal EOT.
* @param {Object} `opts` Options forwarded to `stream.Readable`, along with
* the following: `batchHighWatermark`, the maximum number of batches
* preemptively fetched [default: `2`]; `concurrency`, the maximum number of
* parallel batch calls [default: `1`]. Note that order isn't guaranteed when
* multiple requests are made in parallel.
*
* This is useful when feeding a stream from a secondary thread (e.g. when
* writing a CPP addon).
*
*/
function BatchReadable(fn, opts) {
opts = opts || {};
var batchHighWaterMark = opts.batchHighWaterMark || 2;
var concurrency = opts.concurrency || 1;
stream.Readable.call(this, opts);
var self = this;
var iters = [];
var needPush = false;
var finishing = false;
var pending = 0;
this._read = function () {
var elem = null;
while (iters.length && (elem = iters[0].next()).done) {
// Consume all iterators until we find something, or run out.
iters.shift();
}
if (elem && elem.value) {
this.push(elem.value);
} else if (finishing && !pending) {
this.push(null);
} else {
needPush = true;
}
if (
pending < concurrency &&
!finishing &&
iters.length < batchHighWaterMark
) {
pending++;
fn(batchCb);
}
};
function batchCb(err, iter) {
pending--;
if (err) {
self.emit('error', err);
// Don't return though.
}
if (!iter) {
finishing = true;
} else {
iters.push(iter);
}
if (needPush) {
needPush = false;
self._read();
}
}
}
util.inherits(BatchReadable, stream.Readable);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment