Skip to content

Instantly share code, notes, and snippets.

@calvinmetcalf
Created February 17, 2014 13:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calvinmetcalf/9050186 to your computer and use it in GitHub Desktop.
Save calvinmetcalf/9050186 to your computer and use it in GitHub Desktop.
'use strict';
var inherits = require('util').inherits;
var Transform = require('readable-stream').Transform;
module.exports = Stream;
inherits(Stream, Transform);
//can take either 1 or 2 functions or an array
//first function is called with chunk and next, is called for each item in stream
//next can be called with error or with nothing must be called for each item once
//to pass values through use this.push
//optional second function is flush, called after all items
//if called with array, turns it into a stram
function Stream(transform, flush) {
if (!(this instanceof Stream)) {
return new Stream(transform, flush);
}
var self = this;
Transform.call(self, {
objectMode: true,
decodeStrings: false
});
if (typeof transform === 'function') {
self._transform = function (chunk, _, callback) {
transform.call(self, chunk, callback);
};
if (typeof flush === 'function') {
self._flush = flush;
}
} else if (Array.isArray(transform)) {
transform.forEach(function (item) {
self.write(item);
});
}
}
// default transform function
Stream.prototype._transform = function (chunk, _, callback) {
this.push(chunk);
callback();
};
//if map is called with a function that has 2 arguments
//called as a transform function with item and next arguments
//passes values with this push
Stream.prototype.map = function (func) {
if (func.length === 1) {
return this.mapSync(func);
}
return this.pipe(new Stream(func));
};
//if called with a function with 1 argument
//treats it as a sync function and passes the return value through
Stream.prototype.mapSync = function (func) {
return this.pipe(new Stream(function (chunk, next) {
this.push(func.call(this, chunk));
next();
}));
};
//sync old, start value is optional
Stream.prototype.reduce = function (func, start) {
var acc;
var redStream = new Stream(function (chunk, next) {
if (typeof acc === 'undefined') {
acc = chunk;
} else {
acc = func(acc, chunk);
}
next();
}, function (next) {
this.push(acc);
next();
});
if (typeof start !== 'undefined') {
redStream.write(start);
}
return this.pipe(redStream);
};
//returns a new stream with any streams in the input
//stream flattened out.
//non stream items are passed through
Stream.prototype.flatten = function () {
var outStream = new Stream(function (stream, next) {
if (typeof stream.pipe === 'function') {
stream.pipe(outStream, {
end: false
});
stream.on('end', function () {
next();
});
stream.on('err', next);
} else {
outStream.write(stream);
next();
}
}, function (next) {
outStream.emit('end');
next();
});
return this.pipe(outStream);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment