Skip to content

Instantly share code, notes, and snippets.

@laino
Created September 26, 2014 11:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save laino/cdd483eec0fa84dfc95c to your computer and use it in GitHub Desktop.
Save laino/cdd483eec0fa84dfc95c to your computer and use it in GitHub Desktop.
Faster streaming
'use strict';
var events = require('events');
var util = require('util');
var fs = require('fs');
var BUFF_SIZE = 32 * 1024;
var BUFF_MAX = 256 * 1024;
var BUFF_GROW = 32 * 1024;
Pipe: (source_fd, target_stream, options: {}){
events.EventEmitter.call(this);
this.current = options.start || 0;
this.end = Math.min(options.end || Number.MAX_VALUE,
this.current + (options.length || Number.MAX_VALUE));
this.throttle = options.throttle || 0;
this.buffer1 = new Buffer(BUFF_SIZE); //Buffer that is currently being consumed
this.buffer2 = new Buffer(BUFF_SIZE); //Buffer that will be read into
this.numRead = 0;
this.blocked = false; //Can't write because .write returned false
this.tblocked = false; //Can't write because timeout is running
this.doFinish = false;
this.stopped = false;
this.bgrow = 0; //Grow buffer to this size
this.source = source_fd;
this.target = target_stream;
this.pBytes = 0; // this period number of bytes
this.pStart = Date.now(); // this period start
this.onRead = this.onRead.bind(this);
this.onDrain = this.onDrain.bind(this);
this.onClose = this.onClose.bind(this);
this.periodcb = this.periodcb.bind(this);
this.target.on('drain', this.onDrain);
this.target.on('close', this.onClose);
this.target.on('finish', this.onClose);
}
util.inherits(Pipe, events.EventEmitter);
/* Read the next chunk of data from the source */
Pipe.readNext: (){
if(this.stopped || this.numRead > 0) return;
var buffer = this.buffer2; //read into spare buffer
var current = this.current;
var limit = this.end - current;
if(limit === 0){
this.onRead(null, 0);
return;
}
if(buffer.length < limit) {
limit = buffer.length;
}
fs.read(this.source, buffer, 0, limit, current, this.onRead);
};
/* Remove all event listeners */
Pipe.unlisten: (){
this.target.removeListener('drain', this.onDrain);
this.target.removeListener('close', this.onClose);
this.target.removeListener('finish', this.onClose);
};
/* Called when all data has been successfully written, emits 'finish' */
Pipe.finish: (){
if(this.stopped) return;
this.stopped = true;
this.unlisten();
this.emit('finish');
};
/* Called when some kind of error occured, emits 'error', also stops operation */
Pipe.error: (error){
if(this.stopped) return;
this.stopped = true;
this.unlisten();
this.emit('error', error);
};
/* When the distination is closed */
Pipe.onClose: (){
this.error(new Error('Destination Closed!'));
};
/* When some new data was read from the source */
Pipe.onRead: (error, numRead){
if(error){
return this.error(error);
}
if(numRead === 0){
this.doFinish = true;
}
this.current += numRead;
this.numRead = numRead;
this.checkWrite();
};
/* Writes new data to the destination if possible (and allowed) */
Pipe.checkWrite: (){
if(this.stopped === true) return;
if(!this.allowWrite()) return;
//There was no more data read, emit the finish event
if(this.doFinish){
return this.finish();
}
var numRead = this.numRead;
//Buffer has been drained, calculate the size we will grow
//it to at the next chance
if(numRead === 0){
if(this.bgrow === 0){
var buffL = this.buffer1.length;
if(buffL < BUFF_MAX){
this.bgrow = buffL + BUFF_GROW;
}
}
return;
}
//The buffer wasn't entirely filled, we can only send a slice of it
var buffer = this.buffer2;
if(numRead !== buffer.length){
buffer = buffer.slice(0, numRead);
}
this.pBytes += numRead;
this.blocked = !this.target.write(buffer); //Write the data
this.numRead = 0;
//If we marked the buffer to be grown earlier, here's our chance
//to do it safely
if(this.bgrow > 0){
this.buffer2 = new Buffer(this.bgrow); //new, bigger buffers
this.buffer1 = new Buffer(this.bgrow);
this.bgrow = 0;
} else {
//Otherwise just swap the buffers
this.buffer2 = this.buffer1; //This is now the buffer we will read data into
this.buffer1 = buffer; //This buffer is currently being consumed by write
}
this.readNext();
};
/* When the destination has been drained and more data can be written */
Pipe.onDrain: (){
if(this.blocked === true){
this.blocked = false;
this.checkWrite();
}
};
/* Called when a timeout indicated that the current writing period ended
* and the rate limiting allows more data */
Pipe.periodcb: (){
var now = Date.now();
var periodElapsed = now - this.pStart;
this.pStart = now;
this.pBytes -= Math.floor(this.throttle * (periodElapsed / 1000));
this.tblocked = false;
this.checkWrite();
};
/* Returns true if new data can be written to the destination
* Takes things like rate limiting, the return of the last 'write'
* (and the flush event) into account. */
Pipe.allowWrite: (){
if(this.blocked || this.tblocked) return false;
var throttle = this.throttle;
if(throttle > 0){
var remain = throttle - this.pBytes;
if(remain <= 0){
var now = Date.now();
var periodElapsed = now - this.pStart;
if(periodElapsed < 1000){
this.tblocked = true;
setTimeout(this.periodcb, 1000 - periodElapsed);
return false;
}
this.pStart = now;
this.pBytes -= Math.floor(throttle * (periodElapsed / 1000));
}
}
return true;
};
module.exports.pipe_fd = (source_fd, target_stream, options? {}, callback){
var pipe = new Pipe(source_fd, target_stream, options);
pipe.on('finish') -> (){
callback();
}
pipe.on('error') -> (error){
callback(error);
}
pipe.readNext();
return pipe;
};
module.exports.pipe_path = (source_path, target_stream, options? {}, callback){
fs.open(source_path, 'r') -> (error! callback, source_fd)
var pipe = this.pipe_fd(source_fd, target_stream, options) -> (){
fs.close(source_fd);
callback();
}
return pipe;
};
module.exports.pipe = (source, target_stream, options? {}, callback){
if(typeof source_path === 'string'){
return this.pipe_path(source, target_stream, options, callback);
} else {
return this.pipe_fd(source, target_stream, options, callback);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment