// Require the core node modules. var stream = require( "stream" ); var util = require( "util" ); var crypto = require( "crypto" ); var fileSystem = require( "fs" ); var buffer = require( "buffer" ).Buffer; // ----------------------------------------------------------------------------------- // // ----------------------------------------------------------------------------------- // // CAUTION: Run the code in the next tick to give the full prototype chain a chance to // initialize. If we try to run immediately, we'll get the function hoisting for the // ETagStream constructor, but the prototype chain will not yet be defined fully. process.nextTick( function run() { var fileReadStream = fileSystem.createReadStream( "./gina-carano.jpg" ); // Once the file is finished piping into the etagStream, it will emit an etag // event with the computed MD5 hash. var etagStream = new ETagStream() .on( "etag", function handleETag( etag ) { console.log( "ETag:", etag ); } ) ; fileReadStream.pipe( etagStream ); } ); // ----------------------------------------------------------------------------------- // // ----------------------------------------------------------------------------------- // // I provide a writable stream that will emit an "etag" event once the stream is closed. // The etag will be an MD5 hash of the content that was written to the stream. // -- // NOTE: In this version, we'll be treating the underlying Hash as a Duplex stream that // is both writable and readable. Once we make this leap, we have to assume that the Hash // stream exhibits all of the data-oriented events and buffering concerns of and writable // and readable stream. function ETagStream() { // Call the super constructor. stream.Writable.call( this ); this._hasher = crypto.createHash( "md5" ); // Listen for the "finish" event, which will indicate that we have all the data that // we need in order to generate the MD5 has of the stream content. this.once( "finish", this._handleFinish.bind( this ) ); } util.inherits( ETagStream, stream.Writable ); // --- // PRIVATE METHODS. // --- // I handle the finish event, which, in turn, emits an "etag" event. ETagStream.prototype._handleFinish = function() { // Create a closed-over reference to "this". var etagStream = this; // I hold the chunks of data that can be read out of the hash stream. Once the stream // has been fully consumed, we can concatenate this buffer to get the MD5 digest. var chunks = []; // Now that we are treating the hash as a generic stream, we have to explicitly end // the stream and listen for data events. We can't assume that the data will be // available immediately, or event in one pass. As such, we have to listen for the // "readable" and "end" events so that we know when the "etag" event can be emitted. this._hasher .on( "readable", function handleReadableEvent() { var chunk = null; // Keep reading data until the read() returns null. This will indicate // that we have fully consumed the internal buffers and we'll need to // wait for another "readable" event before reading more. while ( ( chunk = this.read() ) !== null ) { chunks.push( chunk ); } } ) .on( "end", function handleEndEvent() { // Now that we have extracted all of the chunks that represent the MD5 // hash, we can flatten them down into a single buffer and export them // as a hex-encoded string. etagStream.emit( "etag", buffer.concat( chunks ).toString( "hex" ) ); } ) // Close the writable hash stream so that it can calculate the digest internally. .end() ; }; // I write data to the etag stream. ETagStream.prototype._write = function( chunk, encoding, writeComplete ) { // Now that we are treating the hash as a generic stream, we have to worry about // back-pressure. If we write to the hash stream and it returns false, this is an // advisory response that tells us we need to stop writing until we have a // subsequent drain event (which indicates that the internal buffers of the hash // stream have been flushed and are ready to receive more data). if ( this._hasher.write( chunk, encoding ) === false ) { this._hasher.once( "drain", writeComplete ); } else { writeComplete(); } };