Created
February 17, 2014 13:07
-
-
Save calvinmetcalf/9050186 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var inherits = require('util').inherits; | |
var Transform = require('readable-stream').Transform; | |
module.exports = Stream; | |
inherits(Stream, Transform); | |
//can take either 1 or 2 functions or an array | |
//first function is called with chunk and next, is called for each item in stream | |
//next can be called with error or with nothing must be called for each item once | |
//to pass values through use this.push | |
//optional second function is flush, called after all items | |
//if called with array, turns it into a stram | |
function Stream(transform, flush) { | |
if (!(this instanceof Stream)) { | |
return new Stream(transform, flush); | |
} | |
var self = this; | |
Transform.call(self, { | |
objectMode: true, | |
decodeStrings: false | |
}); | |
if (typeof transform === 'function') { | |
self._transform = function (chunk, _, callback) { | |
transform.call(self, chunk, callback); | |
}; | |
if (typeof flush === 'function') { | |
self._flush = flush; | |
} | |
} else if (Array.isArray(transform)) { | |
transform.forEach(function (item) { | |
self.write(item); | |
}); | |
} | |
} | |
// default transform function | |
Stream.prototype._transform = function (chunk, _, callback) { | |
this.push(chunk); | |
callback(); | |
}; | |
//if map is called with a function that has 2 arguments | |
//called as a transform function with item and next arguments | |
//passes values with this push | |
Stream.prototype.map = function (func) { | |
if (func.length === 1) { | |
return this.mapSync(func); | |
} | |
return this.pipe(new Stream(func)); | |
}; | |
//if called with a function with 1 argument | |
//treats it as a sync function and passes the return value through | |
Stream.prototype.mapSync = function (func) { | |
return this.pipe(new Stream(function (chunk, next) { | |
this.push(func.call(this, chunk)); | |
next(); | |
})); | |
}; | |
//sync old, start value is optional | |
Stream.prototype.reduce = function (func, start) { | |
var acc; | |
var redStream = new Stream(function (chunk, next) { | |
if (typeof acc === 'undefined') { | |
acc = chunk; | |
} else { | |
acc = func(acc, chunk); | |
} | |
next(); | |
}, function (next) { | |
this.push(acc); | |
next(); | |
}); | |
if (typeof start !== 'undefined') { | |
redStream.write(start); | |
} | |
return this.pipe(redStream); | |
}; | |
//returns a new stream with any streams in the input | |
//stream flattened out. | |
//non stream items are passed through | |
Stream.prototype.flatten = function () { | |
var outStream = new Stream(function (stream, next) { | |
if (typeof stream.pipe === 'function') { | |
stream.pipe(outStream, { | |
end: false | |
}); | |
stream.on('end', function () { | |
next(); | |
}); | |
stream.on('err', next); | |
} else { | |
outStream.write(stream); | |
next(); | |
} | |
}, function (next) { | |
outStream.emit('end'); | |
next(); | |
}); | |
return this.pipe(outStream); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment