Skip to content

Instantly share code, notes, and snippets.

@skenqbx
Created August 26, 2012 17:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skenqbx/3481927 to your computer and use it in GitHub Desktop.
Save skenqbx/3481927 to your computer and use it in GitHub Desktop.
Buffered Stream
'use strict';
/* load node modules */
var util = require('util');
var Stream = require('stream');
/**
* BufferedStream
* @constructor
* @extends {Stream}
*/
function BufferedStream() {
Stream.call(this);
this.readable = true;
this.writable = true;
this.length = 0;
this._pendings = [];
this._ended = false;
this._paused = false;
}
util.inherits(BufferedStream, Stream);
module.exports = BufferedStream;
/**
* Write data to the stream.
*/
BufferedStream.prototype.write = function(data) {
if (this.writable) {
if (!this._paused) {
this.emit('data', data);
} else {
this.length = this._pendings.push(data);
}
return true;
} else {
this.emit('error', new Error('Stream is not writable'));
}
};
/**
* Terminate the stream.
*/
BufferedStream.prototype.end = function(opt_data) {
if (!this._ended) {
if (opt_data) {
this.write(opt_data);
}
this._ended = true;
this.writable = false;
if (!this._paused) {
this.emit('end');
}
} else {
this.emit('error', new Error('Stream already ended'));
}
};
/**
* Resume 'data' events.
* @return {boolean}
*/
BufferedStream.prototype.resume = function() {
if (this._ended && this.length === 0) {
return false;
}
while (this._pendings.length > 0) {
this.emit('data', this._pendings.shift());
}
this.length = 0;
this._paused = false;
if (this._ended) {
this.emit('end');
}
return true;
};
/**
* Pause 'data' events.
*/
BufferedStream.prototype.pause = function() {
this._paused = true;
};
'use strict';
/* load modules */
var assert = require('assert');
var Stream = require('stream');
var BufferedStream = require('./buffered_stream');
/* initialize streams */
var input = new Stream();
var buffer = new BufferedStream();
input.pipe(buffer);
/* enable buffering */
buffer.pause();
/* setup listeners */
var count = 0;
var tests = ['event1', 'event2', 'event3', 'event4', 'event5'];
buffer.on('data', function(data) {
assert.strictEqual(tests[count], data);
++count;
});
buffer.on('end', function() {
assert.strictEqual(count, 5);
});
/* emit events */
input.emit('data', tests[0]);
input.emit('data', tests[1]);
input.emit('data', tests[2]);
assert.strictEqual(buffer.length, 3, 'Invalid buffer length');
buffer.resume();
assert.strictEqual(buffer.length, 0);
input.emit('data', tests[3]);
buffer.pause();
input.emit('data', tests[4]);
input.emit('end');
assert.strictEqual(buffer.length, 1);
buffer.resume();
assert.strictEqual(buffer.length, 0);
assert(!buffer.writable);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment