-
-
Save skenqbx/059bd9fc323f6eb4ff6f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var events = require('events'); | |
var util = require('util'); | |
var fs = require('fs'); | |
var BUFF_SIZE = 32 * 1024; | |
var BUFF_MAX = 256 * 1024; | |
var BUFF_GROW = 32 * 1024; | |
Pipe: (source_fd, target_stream, options: {}){ | |
events.EventEmitter.call(this); | |
this.current = options.start || 0; | |
this.end = Math.min(options.end || Number.MAX_VALUE, | |
this.current + (options.length || Number.MAX_VALUE)); | |
this.throttle = options.throttle || 0; | |
this.buffer1 = new Buffer(BUFF_SIZE); //Buffer that is currently being consumed | |
this.buffer2 = new Buffer(BUFF_SIZE); //Buffer that will be read into | |
this.numRead = 0; | |
this.blocked = false; //Can't write because .write returned false | |
this.tblocked = false; //Can't write because timeout is running | |
this.doFinish = false; | |
this.stopped = false; | |
this.bgrow = 0; //Grow buffer to this size | |
this.source = source_fd; | |
this.target = target_stream; | |
this.pBytes = 0; // this period number of bytes | |
this.pStart = Date.now(); // this period start | |
this.onRead = this.onRead.bind(this); | |
this.onDrain = this.onDrain.bind(this); | |
this.onClose = this.onClose.bind(this); | |
this.periodcb = this.periodcb.bind(this); | |
this.target.on('drain', this.onDrain); | |
this.target.on('close', this.onClose); | |
this.target.on('finish', this.onClose); | |
} | |
util.inherits(Pipe, events.EventEmitter); | |
/* Read the next chunk of data from the source */ | |
Pipe.readNext: (){ | |
if(this.stopped || this.numRead > 0) return; | |
var buffer = this.buffer2; //read into spare buffer | |
var current = this.current; | |
var limit = this.end - current; | |
if(limit === 0){ | |
this.onRead(null, 0); | |
return; | |
} | |
if(buffer.length < limit) { | |
limit = buffer.length; | |
} | |
fs.read(this.source, buffer, 0, limit, current, this.onRead); | |
}; | |
/* Remove all event listeners */ | |
Pipe.unlisten: (){ | |
this.target.removeListener('drain', this.onDrain); | |
this.target.removeListener('close', this.onClose); | |
this.target.removeListener('finish', this.onClose); | |
}; | |
/* Called when all data has been successfully written, emits 'finish' */ | |
Pipe.finish: (){ | |
if(this.stopped) return; | |
this.stopped = true; | |
this.unlisten(); | |
this.emit('finish'); | |
}; | |
/* Called when some kind of error occured, emits 'error', also stops operation */ | |
Pipe.error: (error){ | |
if(this.stopped) return; | |
this.stopped = true; | |
this.unlisten(); | |
this.emit('error', error); | |
}; | |
/* When the distination is closed */ | |
Pipe.onClose: (){ | |
this.error(new Error('Destination Closed!')); | |
}; | |
/* When some new data was read from the source */ | |
Pipe.onRead: (error, numRead){ | |
if(error){ | |
return this.error(error); | |
} | |
if(numRead === 0){ | |
this.doFinish = true; | |
} | |
this.current += numRead; | |
this.numRead = numRead; | |
this.checkWrite(); | |
}; | |
/* Writes new data to the destination if possible (and allowed) */ | |
Pipe.checkWrite: (){ | |
if(this.stopped === true) return; | |
if(!this.allowWrite()) return; | |
//There was no more data read, emit the finish event | |
if(this.doFinish){ | |
return this.finish(); | |
} | |
var numRead = this.numRead; | |
//Buffer has been drained, calculate the size we will grow | |
//it to at the next chance | |
if(numRead === 0){ | |
if(this.bgrow === 0){ | |
var buffL = this.buffer1.length; | |
if(buffL < BUFF_MAX){ | |
this.bgrow = buffL + BUFF_GROW; | |
} | |
} | |
return; | |
} | |
//The buffer wasn't entirely filled, we can only send a slice of it | |
var buffer = this.buffer2; | |
if(numRead !== buffer.length){ | |
buffer = buffer.slice(0, numRead); | |
} | |
this.pBytes += numRead; | |
this.blocked = !this.target.write(buffer); //Write the data | |
this.numRead = 0; | |
//If we marked the buffer to be grown earlier, here's our chance | |
//to do it safely | |
if(this.bgrow > 0){ | |
this.buffer2 = new Buffer(this.bgrow); //new, bigger buffers | |
this.buffer1 = new Buffer(this.bgrow); | |
this.bgrow = 0; | |
} else { | |
//Otherwise just swap the buffers | |
this.buffer2 = this.buffer1; //This is now the buffer we will read data into | |
this.buffer1 = buffer; //This buffer is currently being consumed by write | |
} | |
this.readNext(); | |
}; | |
/* When the destination has been drained and more data can be written */ | |
Pipe.onDrain: (){ | |
if(this.blocked === true){ | |
this.blocked = false; | |
this.checkWrite(); | |
} | |
}; | |
/* Called when a timeout indicated that the current writing period ended | |
* and the rate limiting allows more data */ | |
Pipe.periodcb: (){ | |
var now = Date.now(); | |
var periodElapsed = now - this.pStart; | |
this.pStart = now; | |
this.pBytes -= Math.floor(this.throttle * (periodElapsed / 1000)); | |
this.tblocked = false; | |
this.checkWrite(); | |
}; | |
/* Returns true if new data can be written to the destination | |
* Takes things like rate limiting, the return of the last 'write' | |
* (and the flush event) into account. */ | |
Pipe.allowWrite: (){ | |
if(this.blocked || this.tblocked) return false; | |
var throttle = this.throttle; | |
if(throttle > 0){ | |
var remain = throttle - this.pBytes; | |
if(remain <= 0){ | |
var now = Date.now(); | |
var periodElapsed = now - this.pStart; | |
if(periodElapsed < 1000){ | |
this.tblocked = true; | |
setTimeout(this.periodcb, 1000 - periodElapsed); | |
return false; | |
} | |
this.pStart = now; | |
this.pBytes -= Math.floor(throttle * (periodElapsed / 1000)); | |
} | |
} | |
return true; | |
}; | |
module.exports.pipe_fd = (source_fd, target_stream, options? {}, callback){ | |
var pipe = new Pipe(source_fd, target_stream, options); | |
pipe.on('finish') -> (){ | |
callback(); | |
} | |
pipe.on('error') -> (error){ | |
callback(error); | |
} | |
pipe.readNext(); | |
return pipe; | |
}; | |
module.exports.pipe_path = (source_path, target_stream, options? {}, callback){ | |
fs.open(source_path, 'r') -> (error! callback, source_fd) | |
var pipe = this.pipe_fd(source_fd, target_stream, options) -> (){ | |
fs.close(source_fd); | |
callback(); | |
} | |
return pipe; | |
}; | |
module.exports.pipe = (source, target_stream, options? {}, callback){ | |
if(typeof source_path === 'string'){ | |
return this.pipe_path(source, target_stream, options, callback); | |
} else { | |
return this.pipe_fd(source, target_stream, options, callback); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
nodejs/node-v0.x-archive#8457