Last active
August 25, 2016 17:01
-
-
Save MiguelCastillo/fd028a4e0a4937fcfe5f69ea08dc7da2 to your computer and use it in GitHub Desktop.
stream frp
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var stream = require("stream"); | |
var inherits = require("util").inherits; | |
function thru(chunk, encoding, handler) { | |
handler.next(chunk); | |
} | |
function handlerWrapper(next) { | |
return { | |
next: function(chunk) { | |
next(null, chunk); | |
}, | |
skip: function() { | |
next(); | |
}, | |
error: function(error) { | |
next(error); | |
} | |
}; | |
} | |
function streamTransform(transform) { | |
return function(chunk, encoding, next) { | |
var handler = handlerWrapper(next); | |
try { | |
transform(chunk, encoding, handler); | |
} | |
catch(ex) { | |
handler.error(ex); | |
} | |
}; | |
} | |
function xfrp(options, transform) { | |
if (!(this instanceof xfrp)) { | |
return new xfrp(options, transform); | |
} | |
stream.Transform.call(this, options || { objectMode: true }); | |
this._transform = streamTransform(transform || thru); | |
} | |
inherits(xfrp, stream.Transform); | |
xfrp.prototype.filter = filter; | |
xfrp.prototype.map = map; | |
xfrp.prototype.forEach = forEach; | |
xfrp.prototype.merge = merge; | |
xfrp.prototype.splice = splice; | |
xfrp.prototype.toJSON = toJSON; | |
xfrp.prototype.toString = toString; | |
xfrp.filter = function(test, options) { | |
return new xfrp().filter(test, options); | |
}; | |
xfrp.map = function(transform, options) { | |
return new xfrp().map(transform, options); | |
}; | |
xfrp.forEach = function(cb, options) { | |
return new xfrp().forEach(cb, options); | |
}; | |
xfrp.toString = function() { | |
return new xfrp().toString(); | |
}; | |
xfrp.toJSON = function() { | |
return new xfrp().toJSON(); | |
}; | |
function filter(test, options) { | |
return this.pipe(new xfrp(options, function(chunk, encoding, handler) { | |
if (test(chunk)) { | |
handler.next(chunk); | |
} | |
else { | |
handler.skip(); | |
} | |
})); | |
} | |
function map(transform, options) { | |
return this.pipe(new xfrp(options, function(chunk, encoding, handler) { | |
handler.next(transform(chunk)); | |
})); | |
} | |
function forEach(cb, options) { | |
return this.pipe(new xfrp(options, function(chunk, encoding, handler) { | |
cb(chunk); | |
handler.next(chunk); | |
})); | |
} | |
function toJSON(options) { | |
return this.pipe(new xfrp(options, function(chunk, encoding, handler) { | |
handler.next( | |
Buffer.isBuffer(chunk) ? JSON.parse(chunk.toString(encoding === "buffer" ? undefined : encoding)) : | |
typeof chunk === "string" ? JSON.parse(chunk) : | |
chunk | |
); | |
})); | |
} | |
function toString(options) { | |
return this.pipe(new xfrp(options, function(chunk, encoding, handler) { | |
handler.next((Buffer.isBuffer(chunk) ? chunk.toString(encoding === "buffer" ? undefined : encoding) : JSON.stringify(chunk)) + "\n"); | |
})); | |
} | |
function merge() { | |
var x = this; | |
Array.prototype.slice.call(arguments) | |
.forEach(function(input) { | |
input.pipe(x); | |
}); | |
return x; | |
} | |
function splice() { | |
var source = this; | |
var x = xfrp(null, null); | |
Array.prototype.slice.call(arguments) | |
.forEach(function(input) { | |
source.pipe(input).pipe(x); | |
}); | |
return x; | |
} | |
module.exports = xfrp; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
That can be used for something like