Skip to content

Instantly share code, notes, and snippets.

@polotek
Created March 2, 2011 13:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save polotek/850957 to your computer and use it in GitHub Desktop.
Save polotek/850957 to your computer and use it in GitHub Desktop.
An attempt to observe how stream.pause() behaves with different read sources
var http = require('http')
, Stream = require('stream').Stream
, fs = require('fs')
, util = require('util');
var slice = Array.prototype.slice;
/**
* Used by BufferStream to handle the actual buffering.
* This is probably not the most awesome approach
*/
function PipeState(source, dest) {
this.buffering = true;
this.size = 0;
this.total = 0;
this.chunks = [];
this.source = source;
this.dest = dest;
}
PipeState.prototype.setDestination = function(dest) {
this.dest = dest;
if( this.buffering ) this.flush();
}
PipeState.prototype.write = function(chunk) {
this.chunks.push(chunk);
this.size += chunk.length;
this.total += chunk.length;
if( !this.buffering )
this.send();
}
PipeState.prototype.buffer = function() {
this.buffering = true;
}
PipeState.prototype.flush = function() {
this.buffering = false;
this.send();
}
PipeState.prototype.send = function() {
if(this.chunks.length) {
var chunk = this.chunks.shift();
if(this.dest.write( chunk ) === false) {
this.source.pause();
} else {
process.nextTick( this.send.bind(this) );
}
this.size -= chunk.length;
}
}
/**
* Stream that will buffer writes, designed for piping
*/
function BufferedStream (limit) {
this.readable = true;
this.writable = true;
this.limit = limit;
this.ps = new PipeState(this);
}
util.inherits(BufferedStream, Stream);
BufferedStream.prototype.pipe = function (dest) {
Stream.prototype.pipe.apply(this, slice.call(arguments));
this.ps.setDestination( dest );
}
BufferedStream.prototype.write = function (chunk) {
if (!this.ps.buffering) {
this.emit('data', chunk);
return;
}
this.ps.write(chunk);
if (this.limit < this.ps.size) {
this.emit('bufferOverrun', this.ps.size);
}
}
BufferedStream.prototype.pause = function() {
this.ps.buffer();
this.emit('pause');
}
BufferedStream.prototype.resume = function() {
this.emit('resume');
this.ps.flush();
}
BufferedStream.prototype.end = function () {
this.emit('end');
console.log('total buffered: ' + this.ps.total);
}
/**
* Stupid subclass used to output useful milestones.
* Wanted to keep this out of the BufferStream code
*/
function WatcherStream() {
this.pauses = 0;
this.resumes = 0;
WatcherStream.super_.apply(this, slice.call(arguments));
}
util.inherits(WatcherStream, BufferedStream);
WatcherStream.prototype.pause = function() {
console.error('pausing...');
WatcherStream.super_.prototype.pause.apply(this, slice.call(arguments));
this.pauses++;
}
WatcherStream.prototype.resume = function() {
console.error('...resuming');
WatcherStream.super_.prototype.resume.apply(this, slice.call(arguments));
this.resumes++;
}
WatcherStream.prototype.end = function() {
WatcherStream.super_.prototype.end.apply(this, slice.call(arguments));
console.log('pauses: ' + this.pauses);
console.log('resumes: ' + this.resumes);
}
/**
* Pipes input stream to output stream via a BufferedStream.
* It can also simulate delayed attachment after some async
* operation.
*
* Also note that the BufferedStream must be attach immediately,
* in order to catch any data and buffer if necessary.
*/
function attach(source, dest, async) {
var ws = new WatcherStream();
ws.on('bufferOverrun', console.log.bind(null, 'buffer full: ') );
source.pipe(ws);
if( async ) {
setTimeout( ws.pipe.bind(ws, dest), 5 );
} else {
ws.pipe(dest);
}
}
/**
* Serve test pdf from local filesystem
*/
function serve_pdf(req, res, async) {
var pdf = './test.pdf';
fs.stat(pdf, function(err, stat) {
if(err) throw err;
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Length', stat.size);
res.on('drain', console.error.bind(null, 'draining...'));
attach( fs.createReadStream(pdf), res, async );
});
}
/**
* Serve test pdf from mounted Samba drive (through VPN too in my case!)
*/
function serve_network_pdf(req, res, async) {
var pdf = '/Volumes/www/test.pdf';
fs.stat(pdf, function(err, stat) {
if(err) throw err;
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Length', stat.size);
res.on('drain', console.error.bind(null, 'draining...'));
attach( fs.createReadStream(pdf), res, async );
});
}
/**
* Serve test pdf as a proxy from remote website
*/
function serve_proxy_pdf(req, res, async) {
http.request({ host: 'marcorogers.com'
, port: 80
, path: '/blog/static/test.pdf'
}
, function(pdfRes) {
var size = pdfRes.headers['content-length'];
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Length', size);
res.on('drain', console.error.bind(null, 'draining...'));
attach( pdfRes, res, async );
}).end();
}
/**
* Serve test pdf using a few different methods
*
* /network/test.pdf - Over Samba network drive
* /proxy/test.pdf - Proxied from remote site
* /* - Everything else just pipes local file
*/
http.createServer(function(req, res) {
var url = req.url
, async = false
, re = /\/async/;
if( re.test(url) ) {
async = true;
url = url.replace( re, '' );
}
console.error('serving: ' + url + ' async: ' + async);
switch(req.url) {
case '/network/test.pdf':
serve_network_pdf(req, res, async);
break;
case '/proxy/test.pdf':
serve_proxy_pdf(req, res, async);
break;
default:
serve_pdf(req, res, async);
break;
}
}).listen(8080);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment