Skip to content

Instantly share code, notes, and snippets.

@thiagozs
Forked from 4poc/LimitStream.js
Created May 6, 2016 14:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thiagozs/c0cd46f7701acf1f84fb25a5f7bc8fdb to your computer and use it in GitHub Desktop.
Save thiagozs/c0cd46f7701acf1f84fb25a5f7bc8fdb to your computer and use it in GitHub Desktop.
Node.js: LimitStream (Bandwidth limited Readable+Writable Stream)
var fs = require('fs'),
util = require('util'),
Stream = require('stream').Stream;
/**
* Create a bandwidth limited stream
*
* This is a read+writeable stream that can limit how fast it
* is written onto by emitting pause and resume events to
* maintain a specified bandwidth limit, that limit can
* furthermore be changed during the transfer.
*/
function LimitStream() {
this.readable = true;
this.writable = true;
this.limit = null;
this.sentBytes = this.tmpSentBytes = 0;
this.startTime = this.tmpStartTime = new Date();
}
util.inherits(LimitStream, Stream);
/**
* Sets a bandwidth limit in KiB/s
*
* Change or sets the bandwidth limit, this also resets
* the temporary variables tmpSentBytes and tmpStartTime.
* There extra temporary values because we want to be able
* to access the global transfer traffic and duration.
* You can change the bandwidth during the transfer.
*
* @param limit the bandwidth (in KiB/s)
*/
LimitStream.prototype.setLimit = function (limit) {
this.limit = (limit * 1024) / 1000.0; // converts to bytes per ms
this.tmpSentBytes = 0;
this.tmpStartTime = new Date();
};
LimitStream.prototype.write = function (data) {
var self = this;
this.sentBytes += data.length;
this.tmpSentBytes += data.length;
console.log('emit data');
this.emit('data', data);
if (self.limit) {
var elapsedTime = new Date() - this.tmpStartTime,
assumedTime = this.tmpSentBytes / this.limit,
lag = assumedTime - elapsedTime;
if (lag > 0) {
console.log('emit pause, will resume in: ' + lag + 'ms');
this.emit('pause');
setTimeout(function () {
console.log('emit resume');
self.emit('resume');
}, lag);
}
}
};
LimitStream.prototype.end = function () {
console.log('emit end');
this.emit('end');
};
LimitStream.prototype.error = function (err) {
console.log('emit error: ' + err);
this.emit('error', err);
};
LimitStream.prototype.close = function () {
console.log('emit close');
this.emit('close');
};
LimitStream.prototype.destroy = function () {
console.log('emit destroy');
this.emit('destroy');
};
var readStream = fs.createReadStream('/tmp/test');
var limitStream = new LimitStream();
limitStream.setLimit(120); // in KiB/s
// pipe readable stream (fs read) into the writable+readable limit stream
readStream.pipe(limitStream);
limitStream.on('pause', function () {
readStream.pause();
});
limitStream.on('resume', function () {
readStream.resume();
});
// ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment