Created
August 26, 2012 17:43
-
-
Save skenqbx/3481927 to your computer and use it in GitHub Desktop.
Buffered Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
/* load node modules */ | |
var util = require('util'); | |
var Stream = require('stream'); | |
/** | |
* BufferedStream | |
* @constructor | |
* @extends {Stream} | |
*/ | |
function BufferedStream() { | |
Stream.call(this); | |
this.readable = true; | |
this.writable = true; | |
this.length = 0; | |
this._pendings = []; | |
this._ended = false; | |
this._paused = false; | |
} | |
util.inherits(BufferedStream, Stream); | |
module.exports = BufferedStream; | |
/** | |
* Write data to the stream. | |
*/ | |
BufferedStream.prototype.write = function(data) { | |
if (this.writable) { | |
if (!this._paused) { | |
this.emit('data', data); | |
} else { | |
this.length = this._pendings.push(data); | |
} | |
return true; | |
} else { | |
this.emit('error', new Error('Stream is not writable')); | |
} | |
}; | |
/** | |
* Terminate the stream. | |
*/ | |
BufferedStream.prototype.end = function(opt_data) { | |
if (!this._ended) { | |
if (opt_data) { | |
this.write(opt_data); | |
} | |
this._ended = true; | |
this.writable = false; | |
if (!this._paused) { | |
this.emit('end'); | |
} | |
} else { | |
this.emit('error', new Error('Stream already ended')); | |
} | |
}; | |
/** | |
* Resume 'data' events. | |
* @return {boolean} | |
*/ | |
BufferedStream.prototype.resume = function() { | |
if (this._ended && this.length === 0) { | |
return false; | |
} | |
while (this._pendings.length > 0) { | |
this.emit('data', this._pendings.shift()); | |
} | |
this.length = 0; | |
this._paused = false; | |
if (this._ended) { | |
this.emit('end'); | |
} | |
return true; | |
}; | |
/** | |
* Pause 'data' events. | |
*/ | |
BufferedStream.prototype.pause = function() { | |
this._paused = true; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
/* load modules */ | |
var assert = require('assert'); | |
var Stream = require('stream'); | |
var BufferedStream = require('./buffered_stream'); | |
/* initialize streams */ | |
var input = new Stream(); | |
var buffer = new BufferedStream(); | |
input.pipe(buffer); | |
/* enable buffering */ | |
buffer.pause(); | |
/* setup listeners */ | |
var count = 0; | |
var tests = ['event1', 'event2', 'event3', 'event4', 'event5']; | |
buffer.on('data', function(data) { | |
assert.strictEqual(tests[count], data); | |
++count; | |
}); | |
buffer.on('end', function() { | |
assert.strictEqual(count, 5); | |
}); | |
/* emit events */ | |
input.emit('data', tests[0]); | |
input.emit('data', tests[1]); | |
input.emit('data', tests[2]); | |
assert.strictEqual(buffer.length, 3, 'Invalid buffer length'); | |
buffer.resume(); | |
assert.strictEqual(buffer.length, 0); | |
input.emit('data', tests[3]); | |
buffer.pause(); | |
input.emit('data', tests[4]); | |
input.emit('end'); | |
assert.strictEqual(buffer.length, 1); | |
buffer.resume(); | |
assert.strictEqual(buffer.length, 0); | |
assert(!buffer.writable); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment