Skip to content

Instantly share code, notes, and snippets.

@tizzo
Last active August 29, 2015 14:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tizzo/325da232900b869d8fc0 to your computer and use it in GitHub Desktop.
Save tizzo/325da232900b869d8fc0 to your computer and use it in GitHub Desktop.
Node.js Streams 2 buffer data and stream all data submitted to date along with new data
node_modules
*.sw*
Original source 1
Original source 2
Original source 3
var split2 = require('split2');
var through2 = require('through2');
var duplexify = require('duplexify');
var fs = require('fs');
var StreamObject = function() {
this.data = [];
this.createWriteStream = this.createWriteStream.bind(this);
this.createReadStream = this.createReadStream.bind(this);
this.dataStream = through2();
};
StreamObject.prototype.createWriteStream = function() {
var self = this;
var writeStream = through2(function(data, enc, cb) {
this.push(data);
self.data.push(data);
cb()
})
writeStream.pipe(self.dataStream, { end: false });
return writeStream;
}
StreamObject.prototype.createReadStream = function() {
var readStream = through2();
var i = 0;
for (i in this.data) {
readStream.push(this.data[i]);
}
this.dataStream.pipe(readStream, { end: false });
return readStream;
}
StreamObject.prototype.createDuplexStream = function() {
return duplexify(this.createWriteStream(), this.createReadStream());
}
var streamObject = new StreamObject();
var writeableStream = streamObject.createDuplexStream();
var join2 = function() {
return through2(function(data, enc, cb) {
this.push(data + "\n");
cb();
});
};
var fileStream = fs.createReadStream('data');
fileStream.setEncoding('utf8');
fileStream
.pipe(split2())
.pipe(writeableStream, { end: false })
.pipe(join2())
.pipe(process.stdout)
;
streamObject.createReadStream()
.pipe(through2(function(data, enc, cb) {
this.push('Second subscribed stream: ' + data);
cb();
}))
.pipe(join2())
.pipe(process.stdout)
;
var counter = 1;
setInterval(function() {
writeableStream.write('Dynamic data ' + counter);
counter++;
}, 500);
{
"name": "statey-streams",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"duplexify": "^3.4.2",
"split2": "^1.0.0",
"through2": "^2.0.0"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment