Skip to content

Instantly share code, notes, and snippets.

@TooTallNate
Created October 17, 2012 22:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TooTallNate/3908775 to your computer and use it in GitHub Desktop.
Save TooTallNate/3908775 to your computer and use it in GitHub Desktop.
A quick `Tee` stream class (streams2). Pipe data to it, call .fork() to get new Readable streams, and read the data from them all. Useful for piping 1 readable stream to 2 or more writable streams.
/**
* Module dependencies.
*/
var assert = require('assert');
var stream = require('stream');
var Writable = stream.Writable;
var PassThrough = stream.PassThrough;
var inherits = require('util').inherits;
var debug = require('debug')('stream:tee');
/**
* Module exports.
*/
module.exports = Tee;
/**
* The `Tee` class is a writable stream that you write data to. Then when you want
* to read that data from another stream just call the `.fork()` function which
* will return a new PassThrough stream instance that outputs the same data written
* to the Tee instance. You can call `fork()` as many times as necessary to fork
* off the data multiple times.
*
* @api public
*/
function Tee (opts) {
if (!(this instanceof Tee)) return new Tee(opts);
Writable.call(this, opts);
this.streams = [];
}
inherits(Tee, Writable);
/**
* Creates and returns a new PassThrough stream instance for this Tee instance.
*
* @param {Object} opts optional "options" to instantiate the PassThrough with
* @return {Stream} a new PassThrough stream instance
* @api public
*/
Tee.prototype.fork = function (opts) {
var stream = new PassThrough(opts);
this.streams.push(stream);
return stream;
};
/**
* The base Writable class' `_write()` implementation.
*/
Tee.prototype._write = function (chunk, done) {
var count = this.streams.length;
var len = chunk.length;
debug('_write() (%d bytes, %d streams)', chunk.length, count);
this.streams.forEach(function (stream, i) {
if (false === stream.write(chunk)) {
debug('need to wait for "drain" for stream %d', i);
stream.once('drain', function () {
debug('got "drain" event for stream %d', i);
--count || done();
});
} else {
--count;
}
});
if (0 === count) done();
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment