Skip to content

Instantly share code, notes, and snippets.

@algesten
Last active November 27, 2016 11:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save algesten/c3cfb118a0823084fc5c27b57c9b7e54 to your computer and use it in GitHub Desktop.
Save algesten/c3cfb118a0823084fc5c27b57c9b7e54 to your computer and use it in GitHub Desktop.

Stream Errors

This is an investigation into how interconnected streams are handling emitted errors.

We want to test if/whether errors are propagated in any direction.

input.pipe(middle).pipe(output)

We also add error handlers on input/middle/output to avoid "uncaught" error event abort.

Error in the input

The error is not propagated in any direction. If we didn't have the error handler on the input, we wouldn't even know it happened.

    input        middle        output        notes
    -----        ------        ------        -----
    
1                pipe
2                              pipe
3   resume
4                resume
5   data
6                data
7                              _write <61 62>
8   error
9                                            input.on('error','Stop right there, mr!')

Error in the middle

An error in the middle only causes the middle to unpipe, nothing more. Again we can only detect it by having an error handler on the stream causing the error. The input does stop pumping in more data.

    input        middle        output        notes
    -----        ------        ------        -----
    
1                pipe
2                              pipe
3   resume
4                resume
5   data
6                data
7                              _write <61 62>
8   data
9                error
10               unpipe
11                                           middle.on('error','We got a problem...')
12  readable

Error in the output

Errors in the output does not propagate either. The input keeps pushing data until the middle highWaterMark is filled up at which point the input pauses.

    input        middle        output        notes
    -----        ------        ------        -----
    
1                pipe
2                              pipe
3   resume
4                resume
5   data
6                data
7                              _write <61 62>
8   data
9                data
10                             _write <20 7a>
11                             error
12                             unpipe
13                                           output.on('error','Uh oh')
14  data
15               readable
16  data
17  data
18  data
19  data
20  data
21  data
22  data
23  data
24  data
25  pause
26  readable
let through2 = require('through2')
let TestReadable = require('./test-readable')
let TestWritable = require('./test-writable')
let patch = (name, s) => {
let _emit = s.emit
s.emit = (...args) => {
console.error(name, args[0])
_emit.apply(s, args)
}
return s
}
let input = patch(' ', new TestReadable({
frequency: 10,
chunkSize: 2
}))
let middle = patch(' ', through2({
highWaterMark: 10
}, function(chunk, enc, callback) {
if (chunk.indexOf('y') >= 0) {
callback(new Error("We got a problem..."))
return
}
this.push(chunk)
callback()
}))
let output = patch(' ', new TestWritable({
highWaterMark: 10,
incrementAmount: 1
}))
// x - crashes in input
// y - crashes in middle
// z - crashes in output
input.put("ab z defghijdefghijdefghijdefghijdefghijdefghijdefghijdefghijdefghij")
input.stop()
let onError = (name, s) => {
s.on('error', (err) => {
console.error(name + ".on('error','" + err.message +"')")
})
}
onError(' input', input)
onError(' middle', middle)
onError(' output', output)
input.pipe(middle).pipe(output)
var stream = require('stream');
var util = require('util');
var TestReadable = module.exports = function(opts) {
var that = this;
opts = opts || {};
stream.Readable.call(this, opts);
this.stopped = false;
var frequency = opts.hasOwnProperty('frequency') ? opts.frequency : 10;
var chunkSize = opts.chunkSize || 2;
var initialSize = opts.initialSize || 1;
var incrementAmount = opts.incrementAmount || 2;
var size = 0;
var buffer = new Buffer(initialSize);
var sendData = function() {
var amount = Math.min(chunkSize, size);
var sendMore = false;
if (amount > 0) {
var chunk = null;
chunk = new Buffer(amount);
buffer.copy(chunk, 0, 0, amount);
if (chunk.indexOf('x') >= 0) {
that.emit('error', new Error("Stop right there, mr!"))
return
}
sendMore = that.push(chunk) !== false;
buffer.copy(buffer, 0, amount, size);
size -= amount;
}
if(size === 0 && that.stopped) {
that.push(null);
}
if (sendMore) {
sendData.timeout = setTimeout(sendData, frequency);
}
else {
sendData.timeout = null;
}
};
this.stop = function() {
if (this.stopped) {
throw new Error('stop() called on already stopped ReadableStreamBuffer');
}
this.stopped = true;
if (size === 0) {
this.push(null);
}
};
var increaseBufferIfNecessary = function(incomingDataSize) {
if((buffer.length - size) < incomingDataSize) {
var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount);
var newBuffer = new Buffer(buffer.length + (incrementAmount * factor));
buffer.copy(newBuffer, 0, 0, size);
buffer = newBuffer;
}
};
this.put = function(data, encoding) {
if (that.stopped) {
throw new Error('Tried to write data to a stopped ReadableStreamBuffer');
}
if(Buffer.isBuffer(data)) {
increaseBufferIfNecessary(data.length);
data.copy(buffer, size, 0);
size += data.length;
}
else {
data = data + '';
var dataSizeInBytes = Buffer.byteLength(data);
increaseBufferIfNecessary(dataSizeInBytes);
buffer.write(data, size, encoding || 'utf8');
size += dataSizeInBytes;
}
};
this._read = function() {
if (!sendData.timeout) {
sendData.timeout = setTimeout(sendData, frequency);
}
};
};
util.inherits(TestReadable, stream.Readable);
var util = require('util');
var stream = require('stream');
var TestWritable = module.exports = function(opts) {
opts = opts || {};
opts.decodeStrings = true;
stream.Writable.call(this, opts);
var incrementAmount = opts.incrementAmount || 1;
var size = 0;
this._write = function(chunk, encoding, callback) {
console.log(' _write', chunk)
if (chunk.indexOf('z') >= 0) {
this.emit('error', new Error("Uh oh"))
}
callback()
};
};
util.inherits(TestWritable, stream.Writable);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment