Created
February 14, 2015 20:39
-
-
Save mtth/f7e076ad10646db855a0 to your computer and use it in GitHub Desktop.
BatchReadable stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var stream = require('stream'), | |
util = require('util'); | |
/** | |
* Batch loaded readable stream. | |
* | |
* @param {Function} `fn(cb)`, where `cb(err, iter)`. Function used to feed | |
* the stream. `iter` should either be an iterator or `null` to signal EOT. | |
* @param {Object} `opts` Options forwarded to `stream.Readable`, along with | |
* the following: `batchHighWatermark`, the maximum number of batches | |
* preemptively fetched [default: `2`]; `concurrency`, the maximum number of | |
* parallel batch calls [default: `1`]. Note that order isn't guaranteed when | |
* multiple requests are made in parallel. | |
* | |
* This is useful when feeding a stream from a secondary thread (e.g. when | |
* writing a CPP addon). | |
* | |
*/ | |
function BatchReadable(fn, opts) { | |
opts = opts || {}; | |
var batchHighWaterMark = opts.batchHighWaterMark || 2; | |
var concurrency = opts.concurrency || 1; | |
stream.Readable.call(this, opts); | |
var self = this; | |
var iters = []; | |
var needPush = false; | |
var finishing = false; | |
var pending = 0; | |
this._read = function () { | |
var elem = null; | |
while (iters.length && (elem = iters[0].next()).done) { | |
// Consume all iterators until we find something, or run out. | |
iters.shift(); | |
} | |
if (elem && elem.value) { | |
this.push(elem.value); | |
} else if (finishing && !pending) { | |
this.push(null); | |
} else { | |
needPush = true; | |
} | |
if ( | |
pending < concurrency && | |
!finishing && | |
iters.length < batchHighWaterMark | |
) { | |
pending++; | |
fn(batchCb); | |
} | |
}; | |
function batchCb(err, iter) { | |
pending--; | |
if (err) { | |
self.emit('error', err); | |
// Don't return though. | |
} | |
if (!iter) { | |
finishing = true; | |
} else { | |
iters.push(iter); | |
} | |
if (needPush) { | |
needPush = false; | |
self._read(); | |
} | |
} | |
} | |
util.inherits(BatchReadable, stream.Readable); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment