Skip to content

Instantly share code, notes, and snippets.

@MiguelCastillo
Last active August 25, 2016 17:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MiguelCastillo/fd028a4e0a4937fcfe5f69ea08dc7da2 to your computer and use it in GitHub Desktop.
Save MiguelCastillo/fd028a4e0a4937fcfe5f69ea08dc7da2 to your computer and use it in GitHub Desktop.
stream frp
var stream = require("stream");
var inherits = require("util").inherits;
function thru(chunk, encoding, handler) {
handler.next(chunk);
}
function handlerWrapper(next) {
return {
next: function(chunk) {
next(null, chunk);
},
skip: function() {
next();
},
error: function(error) {
next(error);
}
};
}
function streamTransform(transform) {
return function(chunk, encoding, next) {
var handler = handlerWrapper(next);
try {
transform(chunk, encoding, handler);
}
catch(ex) {
handler.error(ex);
}
};
}
function xfrp(options, transform) {
if (!(this instanceof xfrp)) {
return new xfrp(options, transform);
}
stream.Transform.call(this, options || { objectMode: true });
this._transform = streamTransform(transform || thru);
}
inherits(xfrp, stream.Transform);
xfrp.prototype.filter = filter;
xfrp.prototype.map = map;
xfrp.prototype.forEach = forEach;
xfrp.prototype.merge = merge;
xfrp.prototype.splice = splice;
xfrp.prototype.toJSON = toJSON;
xfrp.prototype.toString = toString;
xfrp.filter = function(test, options) {
return new xfrp().filter(test, options);
};
xfrp.map = function(transform, options) {
return new xfrp().map(transform, options);
};
xfrp.forEach = function(cb, options) {
return new xfrp().forEach(cb, options);
};
xfrp.toString = function() {
return new xfrp().toString();
};
xfrp.toJSON = function() {
return new xfrp().toJSON();
};
function filter(test, options) {
return this.pipe(new xfrp(options, function(chunk, encoding, handler) {
if (test(chunk)) {
handler.next(chunk);
}
else {
handler.skip();
}
}));
}
function map(transform, options) {
return this.pipe(new xfrp(options, function(chunk, encoding, handler) {
handler.next(transform(chunk));
}));
}
function forEach(cb, options) {
return this.pipe(new xfrp(options, function(chunk, encoding, handler) {
cb(chunk);
handler.next(chunk);
}));
}
function toJSON(options) {
return this.pipe(new xfrp(options, function(chunk, encoding, handler) {
handler.next(
Buffer.isBuffer(chunk) ? JSON.parse(chunk.toString(encoding === "buffer" ? undefined : encoding)) :
typeof chunk === "string" ? JSON.parse(chunk) :
chunk
);
}));
}
function toString(options) {
return this.pipe(new xfrp(options, function(chunk, encoding, handler) {
handler.next((Buffer.isBuffer(chunk) ? chunk.toString(encoding === "buffer" ? undefined : encoding) : JSON.stringify(chunk)) + "\n");
}));
}
function merge() {
var x = this;
Array.prototype.slice.call(arguments)
.forEach(function(input) {
input.pipe(x);
});
return x;
}
function splice() {
var source = this;
var x = xfrp(null, null);
Array.prototype.slice.call(arguments)
.forEach(function(input) {
source.pipe(input).pipe(x);
});
return x;
}
module.exports = xfrp;
@MiguelCastillo
Copy link
Author

That can be used for something like

var xfrp = require("./index.js");
var fs = require("fs");

fs
  .createReadStream("./sample.json")
  .pipe(xfrp())
  .toJSON()
  .forEach(function(item) {
    console.log(item);
  })
  .toString()
  .pipe(process.stdout);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment