Skip to content

Instantly share code, notes, and snippets.

@euforic
Created May 4, 2013 04:52
Show Gist options
  • Save euforic/5516244 to your computer and use it in GitHub Desktop.
Save euforic/5516244 to your computer and use it in GitHub Desktop.
Mega Stream
/**
* Expose `Stream`.
*/
if ('undefined' !== typeof module) {
module.exports = Stream;
}
/**
* Initialize a new `Stream`.
*
* @api public
*/
function Stream(obj) {
if (obj) { return mixin(obj); }
}
/**
* Mixin the emitter properties.
*
* @param {Object} obj
* @return {Object}
* @api private
*/
function mixin(obj) {
for (var key in Stream.prototype) {
obj[key] = Stream.prototype[key];
}
return obj;
}
/**
* indexOf polyfill
*
* @param {Array} arr
* @param {Object} obj
* @return {Number}
* @api private
*/
function index(arr, obj){
var indexOf = [].indexOf;
if (indexOf) { return arr.indexOf(obj); }
for (var i = 0; i < arr.length; ++i) {
if (arr[i] === obj) { return i; }
}
return -1;
}
/**
* [readable description]
* @type {Boolean}
*/
Stream.prototype.readable = true;
/**
* [writable description]
* @type {Boolean}
*/
Stream.prototype.writable = true;
/**
* [write description]
* @param {[type]} data [description]
* @return {[type]} [description]
*/
Stream.prototype.write = function (data) {
if (this.paused) {
this._queue.push(data);
return this;
}
if (this._transform) {
this._transform(data, function (){ });
return this;
}
this.emit('data', data);
return this;
};
/**
* [push description]
* @param {[type]} data [description]
* @return {[type]} [description]
*/
Stream.prototype.push = function (data) {
if ( arguments.callee.caller === this._transform) { return this.emit('data', data); }
this.write(data);
return this;
};
/**
* [pause description]
* @return {[type]} [description]
*/
Stream.prototype.pause = function () {
this.paused = true;
this.readable = false;
this._queue = this._queue || [];
this.emit('pause');
};
/**
* [pause description]
* @return {[type]} [description]
*/
Stream.prototype.resume = function () {
var self = this;
self.paused = false;
self.emit('resume');
if (self._queue) {
self._queue.forEach(function(entry){
self.emit('data', entry);
});
}
self.readable = true;
};
/**
* [pipe description]
*
* @param {[type]} dest [description]
* @param {[type]} options [description]
* @return {[type]} [description]
*/
Stream.prototype.pipe = function (dest, options) {
var source = this;
function ondata (chunk) {
if (dest.writable) {
if (false === dest.write(chunk) && source.pause) {
source.pause();
}
}
}
function ondrain () {
if (source.readable && source.resume) {
source.resume();
}
}
var didOnEnd = false;
function onend () {
if (didOnEnd) return;
didOnEnd = true;
dest.end();
}
function onclose () {
if (didOnEnd) return;
didOnEnd = true;
if (typeof dest.destroy === 'function') dest.destroy();
}
function onerror (err) {
cleanup();
if (this.listeners('error').length === 0) {
throw err; // Unhandled stream error in pipe.
}
}
function cleanup() {
source.removeListener('data', ondata);
dest.removeListener('drain', ondrain);
source.removeListener('end', onend);
dest.removeListener('end', cleanup);
source.removeListener('close', onclose);
dest.removeListener('close', cleanup);
source.removeListener('error', onerror);
dest.removeListener('error', onerror);
source.removeListener('end', cleanup);
source.removeListener('close', cleanup);
}
source.on('data', ondata); dest.on('drain', ondrain);
if ((!options || options.end !== false)) {
source.on('end', onend); source.on('close', onclose);
}
source.on('error', onerror); dest.on('error', onerror);
source.on('end', cleanup); source.on('close', cleanup);
dest.on('end', cleanup); dest.on('close', cleanup);
dest.emit('pipe', source);
return dest;
};
/**
* Listen on the given `event` with `fn`.
*
* @param {String} event
* @param {Function} fn
* @return {Stream}
* @api public
*/
Stream.prototype.on = function(event, fn){
this._callbacks = this._callbacks || {};
(this._callbacks[event] = this._callbacks[event] || [])
.push(fn);
return this;
};
/**
* Adds an `event` listener that will be invoked a single
* time then automatically removed.
*
* @param {String} event
* @param {Function} fn
* @return {Stream}
* @api public
*/
Stream.prototype.once = function(event, fn){
var self = this;
this._callbacks = this._callbacks || {};
function on() {
self.off(event, on);
fn.apply(this, arguments);
}
fn._off = on;
this.on(event, on);
return this;
};
/**
* Remove the given callback for `event` or all
* registered callbacks.
*
* @param {String} event
* @param {Function} fn
* @return {Stream}
* @api public
*/
Stream.prototype.off =
Stream.prototype.removeListener =
Stream.prototype.removeAllListeners = function(event, fn){
this._callbacks = this._callbacks || {};
// all
if (0 == arguments.length) {
this._callbacks = {};
return this;
}
// specific event
var callbacks = this._callbacks[event];
if (!callbacks) return this;
// remove all handlers
if (1 == arguments.length) {
delete this._callbacks[event];
return this;
}
// remove specific handler
var i = index(callbacks, fn._off || fn);
if (~i) callbacks.splice(i, 1);
return this;
};
/**
* Emit `event` with the given args.
*
* @param {String} event
* @param {Mixed} ...
* @return {Stream}
*/
Stream.prototype.emit = function(event){
this._callbacks = this._callbacks || {};
var args = [].slice.call(arguments, 1)
, callbacks = this._callbacks[event];
if (callbacks) {
callbacks = callbacks.slice(0);
for (var i = 0, len = callbacks.length; i < len; ++i) {
callbacks[i].apply(this, args);
}
}
return this;
};
/**
* Return array of callbacks for `event`.
*
* @param {String} event
* @return {Array}
* @api public
*/
Stream.prototype.listeners = function(event){
this._callbacks = this._callbacks || {};
return this._callbacks[event] || [];
};
/**
* Check if this emitter has `event` handlers.
*
* @param {String} event
* @return {Boolean}
* @api public
*/
Stream.prototype.hasListeners = function(event){
return !! this.listeners(event).length;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment