Created
March 2, 2011 13:53
-
-
Save polotek/850957 to your computer and use it in GitHub Desktop.
An attempt to observe how stream.pause() behaves with different read sources
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var http = require('http') | |
, Stream = require('stream').Stream | |
, fs = require('fs') | |
, util = require('util'); | |
var slice = Array.prototype.slice; | |
/** | |
* Used by BufferStream to handle the actual buffering. | |
* This is probably not the most awesome approach | |
*/ | |
function PipeState(source, dest) { | |
this.buffering = true; | |
this.size = 0; | |
this.total = 0; | |
this.chunks = []; | |
this.source = source; | |
this.dest = dest; | |
} | |
PipeState.prototype.setDestination = function(dest) { | |
this.dest = dest; | |
if( this.buffering ) this.flush(); | |
} | |
PipeState.prototype.write = function(chunk) { | |
this.chunks.push(chunk); | |
this.size += chunk.length; | |
this.total += chunk.length; | |
if( !this.buffering ) | |
this.send(); | |
} | |
PipeState.prototype.buffer = function() { | |
this.buffering = true; | |
} | |
PipeState.prototype.flush = function() { | |
this.buffering = false; | |
this.send(); | |
} | |
PipeState.prototype.send = function() { | |
if(this.chunks.length) { | |
var chunk = this.chunks.shift(); | |
if(this.dest.write( chunk ) === false) { | |
this.source.pause(); | |
} else { | |
process.nextTick( this.send.bind(this) ); | |
} | |
this.size -= chunk.length; | |
} | |
} | |
/** | |
* Stream that will buffer writes, designed for piping | |
*/ | |
function BufferedStream (limit) { | |
this.readable = true; | |
this.writable = true; | |
this.limit = limit; | |
this.ps = new PipeState(this); | |
} | |
util.inherits(BufferedStream, Stream); | |
BufferedStream.prototype.pipe = function (dest) { | |
Stream.prototype.pipe.apply(this, slice.call(arguments)); | |
this.ps.setDestination( dest ); | |
} | |
BufferedStream.prototype.write = function (chunk) { | |
if (!this.ps.buffering) { | |
this.emit('data', chunk); | |
return; | |
} | |
this.ps.write(chunk); | |
if (this.limit < this.ps.size) { | |
this.emit('bufferOverrun', this.ps.size); | |
} | |
} | |
BufferedStream.prototype.pause = function() { | |
this.ps.buffer(); | |
this.emit('pause'); | |
} | |
BufferedStream.prototype.resume = function() { | |
this.emit('resume'); | |
this.ps.flush(); | |
} | |
BufferedStream.prototype.end = function () { | |
this.emit('end'); | |
console.log('total buffered: ' + this.ps.total); | |
} | |
/** | |
* Stupid subclass used to output useful milestones. | |
* Wanted to keep this out of the BufferStream code | |
*/ | |
function WatcherStream() { | |
this.pauses = 0; | |
this.resumes = 0; | |
WatcherStream.super_.apply(this, slice.call(arguments)); | |
} | |
util.inherits(WatcherStream, BufferedStream); | |
WatcherStream.prototype.pause = function() { | |
console.error('pausing...'); | |
WatcherStream.super_.prototype.pause.apply(this, slice.call(arguments)); | |
this.pauses++; | |
} | |
WatcherStream.prototype.resume = function() { | |
console.error('...resuming'); | |
WatcherStream.super_.prototype.resume.apply(this, slice.call(arguments)); | |
this.resumes++; | |
} | |
WatcherStream.prototype.end = function() { | |
WatcherStream.super_.prototype.end.apply(this, slice.call(arguments)); | |
console.log('pauses: ' + this.pauses); | |
console.log('resumes: ' + this.resumes); | |
} | |
/** | |
* Pipes input stream to output stream via a BufferedStream. | |
* It can also simulate delayed attachment after some async | |
* operation. | |
* | |
* Also note that the BufferedStream must be attach immediately, | |
* in order to catch any data and buffer if necessary. | |
*/ | |
function attach(source, dest, async) { | |
var ws = new WatcherStream(); | |
ws.on('bufferOverrun', console.log.bind(null, 'buffer full: ') ); | |
source.pipe(ws); | |
if( async ) { | |
setTimeout( ws.pipe.bind(ws, dest), 5 ); | |
} else { | |
ws.pipe(dest); | |
} | |
} | |
/** | |
* Serve test pdf from local filesystem | |
*/ | |
function serve_pdf(req, res, async) { | |
var pdf = './test.pdf'; | |
fs.stat(pdf, function(err, stat) { | |
if(err) throw err; | |
res.setHeader('Content-Type', 'application/pdf'); | |
res.setHeader('Content-Length', stat.size); | |
res.on('drain', console.error.bind(null, 'draining...')); | |
attach( fs.createReadStream(pdf), res, async ); | |
}); | |
} | |
/** | |
* Serve test pdf from mounted Samba drive (through VPN too in my case!) | |
*/ | |
function serve_network_pdf(req, res, async) { | |
var pdf = '/Volumes/www/test.pdf'; | |
fs.stat(pdf, function(err, stat) { | |
if(err) throw err; | |
res.setHeader('Content-Type', 'application/pdf'); | |
res.setHeader('Content-Length', stat.size); | |
res.on('drain', console.error.bind(null, 'draining...')); | |
attach( fs.createReadStream(pdf), res, async ); | |
}); | |
} | |
/** | |
* Serve test pdf as a proxy from remote website | |
*/ | |
function serve_proxy_pdf(req, res, async) { | |
http.request({ host: 'marcorogers.com' | |
, port: 80 | |
, path: '/blog/static/test.pdf' | |
} | |
, function(pdfRes) { | |
var size = pdfRes.headers['content-length']; | |
res.setHeader('Content-Type', 'application/pdf'); | |
res.setHeader('Content-Length', size); | |
res.on('drain', console.error.bind(null, 'draining...')); | |
attach( pdfRes, res, async ); | |
}).end(); | |
} | |
/** | |
* Serve test pdf using a few different methods | |
* | |
* /network/test.pdf - Over Samba network drive | |
* /proxy/test.pdf - Proxied from remote site | |
* /* - Everything else just pipes local file | |
*/ | |
http.createServer(function(req, res) { | |
var url = req.url | |
, async = false | |
, re = /\/async/; | |
if( re.test(url) ) { | |
async = true; | |
url = url.replace( re, '' ); | |
} | |
console.error('serving: ' + url + ' async: ' + async); | |
switch(req.url) { | |
case '/network/test.pdf': | |
serve_network_pdf(req, res, async); | |
break; | |
case '/proxy/test.pdf': | |
serve_proxy_pdf(req, res, async); | |
break; | |
default: | |
serve_pdf(req, res, async); | |
break; | |
} | |
}).listen(8080); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment