Skip to content

Instantly share code, notes, and snippets.

@bterlson
Created October 28, 2014 23:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bterlson/c4040b7ad68b9e15d0de to your computer and use it in GitHub Desktop.
Save bterlson/c4040b7ad68b9e15d0de to your computer and use it in GitHub Desktop.
stream-bifurcate
var _ = require('highland');
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
util.inherits(Tributary, Writable);
function Tributary() {
Writable.call(this, { objectMode: true });
this.forks = [];
this._queue = [];
this._waiting = [];
}
Tributary.prototype.fork = function() {
this.forks.push(new Distributary(this));
return this.forks[this.forks.length - 1]
}
Tributary.prototype._write = function(chunk, enc, cb) {
this._queue.push([chunk, cb]);
this._drain();
}
Tributary.prototype._drain = function() {
if(this.forks.length === 0) return;
if(this._waiting.length === 0) return;
if(this._queue.length === 0) return;
var job = this._queue.shift();
// find most empty waiting stream
var s = this._waiting.reduce(function(least, f, i) {
if(f._readableState.buffer.length < least[0]._readableState.buffer.length)
return [f, i];
return least;
}, [this._waiting[0], 0]);
this._waiting.splice(s[1], 1);
s[0].push(job[0])
job[1]();
}
util.inherits(Distributary, Readable);
function Distributary(trib) {
Readable.call(this, { objectMode: true });
this._tributary = trib;
}
Distributary.prototype._read = function() {
if(!this._tributary._waiting.indexOf(this) > -1) {
this._tributary._waiting.push(this);
}
this._tributary._drain();
}
// Testing purposes
util.inherits(Counter, Readable);
function Counter(opt) {
Readable.call(this, { objectMode: true });
this._max = 100;
this._index = 1;
}
Counter.prototype._read = function() {
var i = this._index++;
setTimeout(function() {
if (i > this._max) this.push(null);
else this.push({this: i});
}.bind(this));
};
var c = new Counter();
var trib = new Tributary();
var f1 = trib.fork();
var f2 = trib.fork();
var f3 = trib.fork();
_(c).pipe(trib);
_(f1).ratelimit(1, 400).each(function(chunk) {
console.log("F1: ", chunk);
});
_(f2).ratelimit(1, 200).each(function(chunk) {
console.log("F2: ", chunk);
});
_(f3).ratelimit(1, 800).each(function(chunk) {
console.log("F3: ", chunk);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment