Created
October 17, 2012 22:33
-
-
Save TooTallNate/3908775 to your computer and use it in GitHub Desktop.
A quick `Tee` stream class (streams2). Pipe data to it, call .fork() to get new Readable streams, and read the data from them all. Useful for piping 1 readable stream to 2 or more writable streams.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Module dependencies. | |
*/ | |
var assert = require('assert'); | |
var stream = require('stream'); | |
var Writable = stream.Writable; | |
var PassThrough = stream.PassThrough; | |
var inherits = require('util').inherits; | |
var debug = require('debug')('stream:tee'); | |
/** | |
* Module exports. | |
*/ | |
module.exports = Tee; | |
/** | |
* The `Tee` class is a writable stream that you write data to. Then when you want | |
* to read that data from another stream just call the `.fork()` function which | |
* will return a new PassThrough stream instance that outputs the same data written | |
* to the Tee instance. You can call `fork()` as many times as necessary to fork | |
* off the data multiple times. | |
* | |
* @api public | |
*/ | |
function Tee (opts) { | |
if (!(this instanceof Tee)) return new Tee(opts); | |
Writable.call(this, opts); | |
this.streams = []; | |
} | |
inherits(Tee, Writable); | |
/** | |
* Creates and returns a new PassThrough stream instance for this Tee instance. | |
* | |
* @param {Object} opts optional "options" to instantiate the PassThrough with | |
* @return {Stream} a new PassThrough stream instance | |
* @api public | |
*/ | |
Tee.prototype.fork = function (opts) { | |
var stream = new PassThrough(opts); | |
this.streams.push(stream); | |
return stream; | |
}; | |
/** | |
* The base Writable class' `_write()` implementation. | |
*/ | |
Tee.prototype._write = function (chunk, done) { | |
var count = this.streams.length; | |
var len = chunk.length; | |
debug('_write() (%d bytes, %d streams)', chunk.length, count); | |
this.streams.forEach(function (stream, i) { | |
if (false === stream.write(chunk)) { | |
debug('need to wait for "drain" for stream %d', i); | |
stream.once('drain', function () { | |
debug('got "drain" event for stream %d', i); | |
--count || done(); | |
}); | |
} else { | |
--count; | |
} | |
}); | |
if (0 === count) done(); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment