Skip to content

Instantly share code, notes, and snippets.

@ZJONSSON
Created March 18, 2013 17:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ZJONSSON/5189249 to your computer and use it in GitHub Desktop.
Save ZJONSSON/5189249 to your computer and use it in GitHub Desktop.
QueueWrite - asynchronous queue for stream output
var stream = require("stream"),
util = require("util");
function QueueWrite(maxOpen,fn,options) {
this.maxOpen = maxOpen;
this.open = 0;
this.queue = [];
this.fn = fn;
stream.Writable.call(this,options);
}
util.inherits(QueueWrite,stream.Writable);
QueueWrite.prototype._write = function(chunk,encoding,callback) {
var self = this;
this.queue.push(function() {
self.open += 1;
callback(); // Slot opened
self.fn(chunk,encoding,function() {
self.open -= 1;
self._pop();
});
});
this._pop();
};
QueueWrite.prototype._pop = function() {
if (this.open < this.maxOpen && this.queue.length)
this.queue.pop()();
if (!this.queue.length && !this.open && this.finished)
stream.Writable.prototype.end.call(this);
};
QueueWrite.prototype.end = function(chunk,encoding) {
if (chunk) this.write(chunk,encoding);
this.finished = true;
this._pop();
};
module.exports = QueueWrite;
var QueueWrite = require("./queuewrite.js"),
fs = require("fs");
// Asynchronously write each chunk to stdout with random delay
function delayPrint(chunk,encoding,callback) {
setTimeout(function() {
process.stdout.write(chunk);
callback();
},Math.random()*200);
}
var rStream = fs.createReadStream("./test.js",{lowWaterMark:3,highWaterMark:5,encoding:'utf-8'}),
qStream = new QueueWrite(5,delayPrint);
rStream.pipe(qStream);
function showStats() {
console.log("\n########################\nOpen: "+qStream.open+" Size of queue: "+qStream.queue.length+"\n########################");
}
var counter = setInterval(showStats,1000);
qStream.on("finish",function() {
clearInterval(counter);
console.log("Final stats:");
showStats();
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment