Skip to content

Instantly share code, notes, and snippets.

@kapetan
Last active August 29, 2015 14:14
Show Gist options
  • Save kapetan/94acb79c2a07e07026b8 to your computer and use it in GitHub Desktop.
Save kapetan/94acb79c2a07e07026b8 to your computer and use it in GitHub Desktop.
Streams

Use pump instead of pipe.

a.pipe(b).pipe(c).on('error', callback);

Error not handled for stream a and b.

var pump = require('pump');

pump(a, b, c, function(err) {
  if(err) return callback(err);
});

At least use once when dealing with stream events.

s.on('end', callback);
s.on('error', callback);
s.on('close', function() {
  callback(new Error('Stream closed'));
});

No guarantee that callback will only be called once.

var once = require('once');
callback = once(callback);

Even better use end-of-stream.

var eos = require('end-of-stream');
eos(s, function(err) {
  if(err) return callback(err);
});

Remember to set highWaterMark for object streams.

var through = require('through2');
var stream = require('stream');
var util = require('util');

var s = through({ objectMode: true });

var MyTransform = function() {
  stream.Transform.call(this, { objectMode: true });
}

util.inherits(MyTransform, stream.Transform);

By default highWaterMark is set to really high for object streams (16k entries), which will make the stream buffer the input in memory. The through2 module also suffers from this.

// Shortcut for objectMode: true and highWaterMark: 16
var s = through.obj();

var MyTransform = function() {
  stream.Transform.call(this, { objectMode: true, highWaterMark: 16 });
}

Use pumpify, duplexify or pump when returning chained streams.

var pump = require('pump');

var readable = function() {
  return a.pipe(b);
}

pump(readable(), b, callback);

Error not handled for stream a.

var readable = function() {
  return pump(a, b);
}

No uncaught error for stream a, but the error is still lost.

var readable = function() {
  return pumpify(a, b);
}

Returns a new stream which wraps a and b. Errors on a and b are propagated.

Don't write to streams directly from within a stream.

var through = require('through2');
var pump = require('pump');

var a = readable();
var b = through.obj(function(data, encoding, callback) {
  data.array.forEach(function(obj) {
    c.write(obj);
  });

  callback();
});
var c = writable();

pump(a, b, c, callback);

Stream b breaks encapsulation, and introduces complicated flow. Use this.push instead.

var b = through.obj(function(data, encoding, callback) {
  var self = this;

  data.array.forEach(function(obj) {
    self.push(obj);
  });

  callback();
});

Don't take streams as arguments.

var work = function(s) {
  // ...
}

work(stream);

Return a writable stream instead. Nicer flow, and no confusion about who is responsible for error handling.

var pump = require('pump');
pump(stream, work(), callback);

Don't mix callbacks and streams (there are always exceptions).

var pump = require('pump');

var work = function(callback) {
  async(function(err, s) {
    callback(err, s);
  });
}

work(function(err, stream) {
  if(err) return callback(err);
  pump(stream, a, callback);
});

Consider streams part of async flow control, together with callbacks and promises, you wouldn't normally mix these too much. Instead return some kind of pass through stream. Much cleaner flow for the consumer of the stream.

var stream = require('stream');

var work = function() {
  var pass = new stream.PassThrough();

  async(function(err, s) {
    if(err) return pass.emit('error', err);

    pump(s, pass, function(err) {
      if(err) pass.emit('error', err);
    });
  });

  return pass;
}

pump(work(), a, callback);

The pumpify and duplexify modules can be used instead of the pass through stream, as it's possible to set the stream chain asynchronously.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment