Working with streams in node can be a scary and daunting task. The first time I saw the stream handbook, it left me quite perplexed. After seeing stuff like
through
, through2
, and @acrute doing some stream kung fu on the job, I knew it was time. Streams are actually pretty straightforward once you get the hang of things. Read the handbook
and the node docs and eventually your epiphany shall be born. I think one of the most confusing things for me was that Readable
streams are for reading values that are produced by you, while Writeable
streams are for dealing with values that are to be
consumed by you. Say I wanted to put (write) some data from another stream into a file, and I wanted to take control of this process.
Or say I just wanted to log (write) the results of a stream to say the console. These would be prime for Writeable
. However,
what if I were to say...stream some results of a database result set and make then available to a consumer. I'd want Readable
in
that case so that clients could read the results of my payload. You could even have some fun with the code sample below and insert
a timeout between your producer.push
calls (so as to simulate network overhead). Also note that I'm using the none prototype
version of creating custom streams. In the node docs and other places, you may see usages of util.inherits
and some prototype
based stuff (inherit from base Stream
construct in node).
From through2
Inspired by Dominic Tarr's through in that it's so much easier to make a stream out of a function than it is to set up the prototype chain properly: through(function (chunk) { ... }).
Just note that the prototype stuff isn't really necessary anymore (guessing the core commiters heard @rvagg loud and clear).
Hopefully this makes a little more sense to you now. I'm thankful that I finally understand this stuff to some extent, but we're still only scratching the surface here.
var Readable = require('stream').Readable;
var Transform = require('stream').Transform;
// without { objectMode: true }, we'll get:
// https://github.com/nodejs/node/blob/46b7d15/lib/events.js#L141
var producer = Readable({ objectMode: true });
var transform = Transform({ objectMode: true });
var index = 0;
var fruit = [
{ name: 'Apple', color: 'Red' },
{ name: 'Orange', color: 'Orange' },
{ name: 'Pear', color: 'light-green' }
];
producer._read = function () {
// stream one record at a time
producer.push(fruit[index++]);
if (index > fruit.length) {
// done, no more records (null is used to terminate a stream/EOF.
// this is native node stuff)
// https://nodejs.org/api/stream.html#stream_object_mode
producer.push(null)
}
};
// transform each record into a string before piping to standard out.
//
// otherwise we'll get:
// https://github.com/nodejs/node/blob/f32a606/lib/net.js#L616-L617
transform._transform = function(data, e, callback) {
var data = JSON.stringify(data)
// check for null/undefined since we push this value onto the array to terminate
// the string...otherwise an error will occur
data ? transform.push(data.toString()) : void 0
callback();
}
producer
.pipe(transform)
.pipe(process.stdout)
https://github.com/felixge/node-mysql/issues/741#issuecomment-36120903
https://github.com/substack/stream-handbook
https://nodejs.org/api/stream.html