Use pump
instead of pipe
.
a.pipe(b).pipe(c).on('error', callback);
Error not handled for stream a
and b
.
var pump = require('pump');
pump(a, b, c, function(err) {
if(err) return callback(err);
});
At least use once
when dealing with stream events.
s.on('end', callback);
s.on('error', callback);
s.on('close', function() {
callback(new Error('Stream closed'));
});
No guarantee that callback
will only be called once.
var once = require('once');
callback = once(callback);
Even better use end-of-stream
.
var eos = require('end-of-stream');
eos(s, function(err) {
if(err) return callback(err);
});
Remember to set highWaterMark
for object streams.
var through = require('through2');
var stream = require('stream');
var util = require('util');
var s = through({ objectMode: true });
var MyTransform = function() {
stream.Transform.call(this, { objectMode: true });
}
util.inherits(MyTransform, stream.Transform);
By default highWaterMark
is set to really high for object streams (16k entries), which will make the stream buffer the input in memory. The through2
module also suffers from this.
// Shortcut for objectMode: true and highWaterMark: 16
var s = through.obj();
var MyTransform = function() {
stream.Transform.call(this, { objectMode: true, highWaterMark: 16 });
}
Use pumpify
, duplexify
or pump
when returning chained streams.
var pump = require('pump');
var readable = function() {
return a.pipe(b);
}
pump(readable(), b, callback);
Error not handled for stream a
.
var readable = function() {
return pump(a, b);
}
No uncaught error for stream a
, but the error is still lost.
var readable = function() {
return pumpify(a, b);
}
Returns a new stream which wraps a
and b
. Errors on a
and b
are propagated.
Don't write to streams directly from within a stream.
var through = require('through2');
var pump = require('pump');
var a = readable();
var b = through.obj(function(data, encoding, callback) {
data.array.forEach(function(obj) {
c.write(obj);
});
callback();
});
var c = writable();
pump(a, b, c, callback);
Stream b
breaks encapsulation, and introduces complicated flow. Use this.push
instead.
var b = through.obj(function(data, encoding, callback) {
var self = this;
data.array.forEach(function(obj) {
self.push(obj);
});
callback();
});
Don't take streams as arguments.
var work = function(s) {
// ...
}
work(stream);
Return a writable stream instead. Nicer flow, and no confusion about who is responsible for error handling.
var pump = require('pump');
pump(stream, work(), callback);
Don't mix callbacks and streams (there are always exceptions).
var pump = require('pump');
var work = function(callback) {
async(function(err, s) {
callback(err, s);
});
}
work(function(err, stream) {
if(err) return callback(err);
pump(stream, a, callback);
});
Consider streams part of async flow control, together with callbacks and promises, you wouldn't normally mix these too much. Instead return some kind of pass through stream. Much cleaner flow for the consumer of the stream.
var stream = require('stream');
var work = function() {
var pass = new stream.PassThrough();
async(function(err, s) {
if(err) return pass.emit('error', err);
pump(s, pass, function(err) {
if(err) pass.emit('error', err);
});
});
return pass;
}
pump(work(), a, callback);
The pumpify
and duplexify
modules can be used instead of the pass through stream, as it's possible to set the stream chain asynchronously.