Skip to content

Instantly share code, notes, and snippets.

@shirish87
Last active April 12, 2016 21:23
Show Gist options
  • Save shirish87/32518d60fce6c3ff6c7ed3aa905c14cd to your computer and use it in GitHub Desktop.
Save shirish87/32518d60fce6c3ff6c7ed3aa905c14cd to your computer and use it in GitHub Desktop.
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
function writeFn(callback) {
console.log(this.data);
callback && callback();
}
function newLetterStream(id, interval, max) {
var src = new Readable({ objectMode: true });
src.iter = 96;
var cnt = max;
src._read = function () {
if (max > 0 && --cnt < 1) return this.push(null);
if (src.iter >= 'z'.charCodeAt(0)) src.iter = 96;
setTimeout(function () {
var chunk = { id: id, data: id + '-' + String.fromCharCode(++src.iter) };
chunk.write = writeFn.bind(chunk);
src.push(chunk);
}, interval);
};
return src;
}
function newNumberStream(id, interval, max) {
var src = new Readable({ objectMode: true });
src.iter = 0;
var cnt = max;
src._read = function () {
if (max > 0 && --cnt < 1) return this.push(null);
if (src.iter >= 9) src.iter = 0;
setTimeout(function () {
var chunk = { id: id, data: id + '-' + (++src.iter) };
chunk.write = writeFn.bind(chunk);
src.push(chunk);
}, interval);
};
return src;
}
function Balancer(sockets, max) {
Writable.call(this, { objectMode: true });
this.sockets = sockets || [];
this.buffer = [];
this.bufferMax = max || 10;
this.maxIdleTime = 50;
this.idleTimeout = null;
this.lastStreamId = null;
this.setMaxListeners(0);
}
util.inherits(Balancer, Writable);
Balancer.prototype._write = function _write(chunk, encoding, callback) {
clearTimeout(this.idleTimeout);
this.idleTimeout = null;
var bufferLength = this.buffer.length;
var writeChunk;
if (!this.lastStreamId) {
writeChunk = chunk;
} else if (this.lastStreamId === chunk.id) {
// received two consecutive chunks for same socketId
if (this.sockets.length === 1 && !bufferLength) {
// only one active socket, so pass-through
writeChunk = chunk;
} else {
// pop out others streamId chunk if available
for (var i = 0; i < bufferLength; i++) {
if (this.buffer[i].id !== chunk.id) {
writeChunk = this.buffer[i];
this.buffer.splice(i, 1);
break;
}
}
if (!writeChunk && bufferLength >= this.bufferMax) {
// buffer contains all items of current streamId
// hold atmost 5 items of same streamId
writeChunk = this.buffer.shift();
}
this.buffer.push(chunk);
}
} else {
// find any previous chunk for same streamId as current
for (var i = 0; i < bufferLength; i++) {
if (this.buffer[i].id === chunk.id) {
writeChunk = this.buffer[i];
this.buffer.splice(i, 1);
break;
}
}
if (writeChunk) {
this.buffer.push(chunk);
} else {
writeChunk = chunk;
}
}
var pending = this.buffer;
this.idleTimeout = setTimeout(function () {
process.nextTick(function () {
var chunk;
while ((chunk = pending.shift()) != null) {
chunk.write();
}
});
}, this.maxIdleTime);
if (writeChunk) {
this.lastStreamId = writeChunk.id;
writeChunk.write(callback);
} else {
setImmediate(callback);
}
};
Balancer.prototype.flushChunks = function flushChunks(streamId) {
for (var i = 0; i < this.buffer.length; i++) {
if (this.buffer[i].id === streamId) {
this.buffer[i].write();
this.buffer.splice(i, 1);
i--;
}
}
};
var balancer = new Balancer();
newNumberStream(2, 100, 50).pipe(balancer, { end: false });
newNumberStream(3, 100, 80).pipe(balancer, { end: false });
newLetterStream(1, 25, 100).pipe(balancer, { end: false });
process.stdout.on('error', process.exit);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment