Created
May 16, 2015 21:06
-
-
Save bennadel/fd04058bc8c3e424bb33 to your computer and use it in GitHub Desktop.
Exposing Promise / Deferred Functionality On Streams In Node.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Require our core node modules. | |
var Buffer = require( "buffer" ).Buffer; | |
var Q = require( "q" ); | |
var stream = require( "stream" ); | |
var util = require( "util" ); | |
// ----------------------------------------------------------------------------------- // | |
// ----------------------------------------------------------------------------------- // | |
// I am a factory function for creating writable content streams. If maxContentLength | |
// is omitted, the content stream will use the default value. | |
exports.createWriteStream = function( maxContentLength ) { | |
return( new ContentStream( maxContentLength ) ); | |
}; | |
// I am a writable stream that accumulates content across writes and emits a "content" | |
// event once the entirety of the content has been aggregated. The content is emitted | |
// as a UTF-8 encoded value (for now). | |
function ContentStream( newMaxContentLength ) { | |
// Call the super constructor. | |
stream.Writable.call( this ); | |
// I am the max length that the aggregated content size can be. If the max size is | |
// exceeded, an error is emitted. | |
this.maxContentLength = ( newMaxContentLength || 65536 ); // 64Kb. | |
// I hold the running sum of buffer lengths being aggregated internally. I am used | |
// to validate the max content length. | |
this._contentLength = 0 ; | |
// I hold the individual chunks across writes so that we don't have to concat all | |
// the chunk values until the stream is finished. | |
this._buffers = []; | |
// I am the deferred value representation of the stream state. | |
// -- | |
// NOTE: The stream and the deferred value are only linked because we say they are | |
// linked. As such, it's up to us to determine the interplay between stream events | |
// and the deferred value. | |
this._deferred = Q.defer(); | |
// Expose the promise on the stream. | |
var promise = this.promise = this._deferred.promise; | |
// Expose the promise methods on the stream itself (as a convenience). Since the | |
// Promise is implemented using prototypal inheritance (as opposed to relying on | |
// lexical binding), we have to bind the method references back to the promise. | |
// -- | |
// NOTE: This makes the stream a "thenable" object. | |
this.then = promise.then.bind( promise ); | |
this.catch = promise.catch.bind( promise ); | |
this.finally = promise.finally.bind( promise ); | |
// When the stream is closed, and the finish event is emitted; at that point, we can | |
// flatten all of the aggregated buffers. | |
this.once( "finish", this._handleFinishEvent ); | |
// When the content event is emitted, we can use that resolve the deferred value. | |
// -- | |
// NOTE: We are using .once() since we can only resolve the value once. | |
this.once( "content", this._deferred.resolve ); | |
// If anything goes wrong, the stream will emit an error, which we can use to reject | |
// the deferred value. | |
// -- | |
// CAUTION: Since we are binding to the error event, it will be sufficient to stop | |
// the error event from bubbling up as an exception. However, any existing "pipe" | |
// will be automatically unpiped due to the default stream behavior. That said, we | |
// are using .on() (instead of .once()) so that we will catch more than one error | |
// event. Just because we reject the deferred value, it doesn't mean that we want to | |
// start letting subsequent error events go unhandled. | |
this.on( "error", this._deferred.reject ); | |
} | |
util.inherits( ContentStream, stream.Writable ); | |
// --- | |
// PRIVATE METHODS. | |
// --- | |
// I handle the finish event emitted on the stream, which is emitted once the write | |
// stream has been closed. | |
ContentStream.prototype._handleFinishEvent = function() { | |
// Collapse all the buffers into a single string value. | |
var content = Buffer.concat( this._buffers ).toString( "utf-8" ); | |
this.emit( "content", content ); | |
}; | |
// I consume the chunk of data being written to the stream. | |
ContentStream.prototype._write = function( chunk, encoding, chunkConsumed ) { | |
// The stream and the underlying deferred value are not inherently linked. As such, | |
// there's nothing that will stop the stream from accepting writes just because the | |
// deferred value has been resolved or rejected. As such, we have to reject any write | |
// that is executed after the deferred value is no longer in a pending state. | |
if ( ! this.promise.isPending() ) { | |
return( chunkConsumed( new Error( "Stream is no longer pending." ) ) ); | |
} | |
// Check to see if the incoming chunk puts the accumulated content length over | |
// the max allowed length. If so, pass-through an error (which will lead to an | |
// error event being emitted, which will lead to our deferred value being rejected). | |
if ( ( this._contentLength += chunk.length ) > this.maxContentLength ) { | |
return( chunkConsumed( new Error( "Content too large." ) ) ); | |
} | |
this._buffers.push( chunk ); | |
chunkConsumed(); | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Require our core node modules. | |
var http = require( "http" ); | |
var util = require( "util" ); | |
// Require our core application modules. | |
var createContentStream = require( "./content-stream" ).createWriteStream; | |
// ----------------------------------------------------------------------------------- // | |
// ----------------------------------------------------------------------------------- // | |
// Create a simple HTTP server so that we can aggregate incoming request content. | |
var httpServer = http.createServer( | |
function handleHttpRequest( request, response ) { | |
// Pipe the request into a ContentStream, which will aggregate the request | |
// body and expose the content through a promise value. | |
// -- | |
// NOTE: the ContentStream has a ".promise" property; but, as a convenience, it | |
// also directly exposes ".then()", ".catch()", and ".finally()" methods which | |
// are bound to the underlying promise. | |
request | |
.pipe( createContentStream() ) // <-- Returns stream, exposes .then(). | |
.on( | |
"error", | |
function handleErrorEvent( error ) { | |
// NOTE: This is just here to demonstrate that I can listen for error | |
// events directly on the stream (N possible events); or, I can use | |
// the .catch() method (1 possible event); or, both. | |
console.log( "Content error:", util.inspect( error ) ); | |
} | |
) | |
.then( | |
function handleResolve( content ) { | |
response.write( "CONTENT: " ); | |
response.write( content ); | |
response.write( "\n" ); | |
} | |
) | |
.catch( | |
function handleReject( error ) { | |
response.write( "ERROR: " ); | |
response.write( util.inspect( error ) ); | |
response.write( "\n" ); | |
} | |
) | |
.finally( | |
function handleDone() { | |
// No matter what happens, close the response. | |
response.end( "Fin!" ); | |
} | |
) | |
; | |
} | |
); | |
httpServer.listen( 8080 ); | |
console.log( "Node server listening on port 8080." ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment