Skip to content

Instantly share code, notes, and snippets.

@bennadel
Created May 16, 2015 21:06
Show Gist options
  • Save bennadel/fd04058bc8c3e424bb33 to your computer and use it in GitHub Desktop.
Save bennadel/fd04058bc8c3e424bb33 to your computer and use it in GitHub Desktop.
Exposing Promise / Deferred Functionality On Streams In Node.js
// Require our core node modules.
var Buffer = require( "buffer" ).Buffer;
var Q = require( "q" );
var stream = require( "stream" );
var util = require( "util" );
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
// I am a factory function for creating writable content streams. If maxContentLength
// is omitted, the content stream will use the default value.
exports.createWriteStream = function( maxContentLength ) {
return( new ContentStream( maxContentLength ) );
};
// I am a writable stream that accumulates content across writes and emits a "content"
// event once the entirety of the content has been aggregated. The content is emitted
// as a UTF-8 encoded value (for now).
function ContentStream( newMaxContentLength ) {
// Call the super constructor.
stream.Writable.call( this );
// I am the max length that the aggregated content size can be. If the max size is
// exceeded, an error is emitted.
this.maxContentLength = ( newMaxContentLength || 65536 ); // 64Kb.
// I hold the running sum of buffer lengths being aggregated internally. I am used
// to validate the max content length.
this._contentLength = 0 ;
// I hold the individual chunks across writes so that we don't have to concat all
// the chunk values until the stream is finished.
this._buffers = [];
// I am the deferred value representation of the stream state.
// --
// NOTE: The stream and the deferred value are only linked because we say they are
// linked. As such, it's up to us to determine the interplay between stream events
// and the deferred value.
this._deferred = Q.defer();
// Expose the promise on the stream.
var promise = this.promise = this._deferred.promise;
// Expose the promise methods on the stream itself (as a convenience). Since the
// Promise is implemented using prototypal inheritance (as opposed to relying on
// lexical binding), we have to bind the method references back to the promise.
// --
// NOTE: This makes the stream a "thenable" object.
this.then = promise.then.bind( promise );
this.catch = promise.catch.bind( promise );
this.finally = promise.finally.bind( promise );
// When the stream is closed, and the finish event is emitted; at that point, we can
// flatten all of the aggregated buffers.
this.once( "finish", this._handleFinishEvent );
// When the content event is emitted, we can use that resolve the deferred value.
// --
// NOTE: We are using .once() since we can only resolve the value once.
this.once( "content", this._deferred.resolve );
// If anything goes wrong, the stream will emit an error, which we can use to reject
// the deferred value.
// --
// CAUTION: Since we are binding to the error event, it will be sufficient to stop
// the error event from bubbling up as an exception. However, any existing "pipe"
// will be automatically unpiped due to the default stream behavior. That said, we
// are using .on() (instead of .once()) so that we will catch more than one error
// event. Just because we reject the deferred value, it doesn't mean that we want to
// start letting subsequent error events go unhandled.
this.on( "error", this._deferred.reject );
}
util.inherits( ContentStream, stream.Writable );
// ---
// PRIVATE METHODS.
// ---
// I handle the finish event emitted on the stream, which is emitted once the write
// stream has been closed.
ContentStream.prototype._handleFinishEvent = function() {
// Collapse all the buffers into a single string value.
var content = Buffer.concat( this._buffers ).toString( "utf-8" );
this.emit( "content", content );
};
// I consume the chunk of data being written to the stream.
ContentStream.prototype._write = function( chunk, encoding, chunkConsumed ) {
// The stream and the underlying deferred value are not inherently linked. As such,
// there's nothing that will stop the stream from accepting writes just because the
// deferred value has been resolved or rejected. As such, we have to reject any write
// that is executed after the deferred value is no longer in a pending state.
if ( ! this.promise.isPending() ) {
return( chunkConsumed( new Error( "Stream is no longer pending." ) ) );
}
// Check to see if the incoming chunk puts the accumulated content length over
// the max allowed length. If so, pass-through an error (which will lead to an
// error event being emitted, which will lead to our deferred value being rejected).
if ( ( this._contentLength += chunk.length ) > this.maxContentLength ) {
return( chunkConsumed( new Error( "Content too large." ) ) );
}
this._buffers.push( chunk );
chunkConsumed();
};
// Require our core node modules.
var http = require( "http" );
var util = require( "util" );
// Require our core application modules.
var createContentStream = require( "./content-stream" ).createWriteStream;
// ----------------------------------------------------------------------------------- //
// ----------------------------------------------------------------------------------- //
// Create a simple HTTP server so that we can aggregate incoming request content.
var httpServer = http.createServer(
function handleHttpRequest( request, response ) {
// Pipe the request into a ContentStream, which will aggregate the request
// body and expose the content through a promise value.
// --
// NOTE: the ContentStream has a ".promise" property; but, as a convenience, it
// also directly exposes ".then()", ".catch()", and ".finally()" methods which
// are bound to the underlying promise.
request
.pipe( createContentStream() ) // <-- Returns stream, exposes .then().
.on(
"error",
function handleErrorEvent( error ) {
// NOTE: This is just here to demonstrate that I can listen for error
// events directly on the stream (N possible events); or, I can use
// the .catch() method (1 possible event); or, both.
console.log( "Content error:", util.inspect( error ) );
}
)
.then(
function handleResolve( content ) {
response.write( "CONTENT: " );
response.write( content );
response.write( "\n" );
}
)
.catch(
function handleReject( error ) {
response.write( "ERROR: " );
response.write( util.inspect( error ) );
response.write( "\n" );
}
)
.finally(
function handleDone() {
// No matter what happens, close the response.
response.end( "Fin!" );
}
)
;
}
);
httpServer.listen( 8080 );
console.log( "Node server listening on port 8080." );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment