Last active
April 12, 2016 21:23
-
-
Save shirish87/32518d60fce6c3ff6c7ed3aa905c14cd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Readable = require('stream').Readable; | |
var Writable = require('stream').Writable; | |
var util = require('util'); | |
function writeFn(callback) { | |
console.log(this.data); | |
callback && callback(); | |
} | |
function newLetterStream(id, interval, max) { | |
var src = new Readable({ objectMode: true }); | |
src.iter = 96; | |
var cnt = max; | |
src._read = function () { | |
if (max > 0 && --cnt < 1) return this.push(null); | |
if (src.iter >= 'z'.charCodeAt(0)) src.iter = 96; | |
setTimeout(function () { | |
var chunk = { id: id, data: id + '-' + String.fromCharCode(++src.iter) }; | |
chunk.write = writeFn.bind(chunk); | |
src.push(chunk); | |
}, interval); | |
}; | |
return src; | |
} | |
function newNumberStream(id, interval, max) { | |
var src = new Readable({ objectMode: true }); | |
src.iter = 0; | |
var cnt = max; | |
src._read = function () { | |
if (max > 0 && --cnt < 1) return this.push(null); | |
if (src.iter >= 9) src.iter = 0; | |
setTimeout(function () { | |
var chunk = { id: id, data: id + '-' + (++src.iter) }; | |
chunk.write = writeFn.bind(chunk); | |
src.push(chunk); | |
}, interval); | |
}; | |
return src; | |
} | |
function Balancer(sockets, max) { | |
Writable.call(this, { objectMode: true }); | |
this.sockets = sockets || []; | |
this.buffer = []; | |
this.bufferMax = max || 10; | |
this.maxIdleTime = 50; | |
this.idleTimeout = null; | |
this.lastStreamId = null; | |
this.setMaxListeners(0); | |
} | |
util.inherits(Balancer, Writable); | |
Balancer.prototype._write = function _write(chunk, encoding, callback) { | |
clearTimeout(this.idleTimeout); | |
this.idleTimeout = null; | |
var bufferLength = this.buffer.length; | |
var writeChunk; | |
if (!this.lastStreamId) { | |
writeChunk = chunk; | |
} else if (this.lastStreamId === chunk.id) { | |
// received two consecutive chunks for same socketId | |
if (this.sockets.length === 1 && !bufferLength) { | |
// only one active socket, so pass-through | |
writeChunk = chunk; | |
} else { | |
// pop out others streamId chunk if available | |
for (var i = 0; i < bufferLength; i++) { | |
if (this.buffer[i].id !== chunk.id) { | |
writeChunk = this.buffer[i]; | |
this.buffer.splice(i, 1); | |
break; | |
} | |
} | |
if (!writeChunk && bufferLength >= this.bufferMax) { | |
// buffer contains all items of current streamId | |
// hold atmost 5 items of same streamId | |
writeChunk = this.buffer.shift(); | |
} | |
this.buffer.push(chunk); | |
} | |
} else { | |
// find any previous chunk for same streamId as current | |
for (var i = 0; i < bufferLength; i++) { | |
if (this.buffer[i].id === chunk.id) { | |
writeChunk = this.buffer[i]; | |
this.buffer.splice(i, 1); | |
break; | |
} | |
} | |
if (writeChunk) { | |
this.buffer.push(chunk); | |
} else { | |
writeChunk = chunk; | |
} | |
} | |
var pending = this.buffer; | |
this.idleTimeout = setTimeout(function () { | |
process.nextTick(function () { | |
var chunk; | |
while ((chunk = pending.shift()) != null) { | |
chunk.write(); | |
} | |
}); | |
}, this.maxIdleTime); | |
if (writeChunk) { | |
this.lastStreamId = writeChunk.id; | |
writeChunk.write(callback); | |
} else { | |
setImmediate(callback); | |
} | |
}; | |
Balancer.prototype.flushChunks = function flushChunks(streamId) { | |
for (var i = 0; i < this.buffer.length; i++) { | |
if (this.buffer[i].id === streamId) { | |
this.buffer[i].write(); | |
this.buffer.splice(i, 1); | |
i--; | |
} | |
} | |
}; | |
var balancer = new Balancer(); | |
newNumberStream(2, 100, 50).pipe(balancer, { end: false }); | |
newNumberStream(3, 100, 80).pipe(balancer, { end: false }); | |
newLetterStream(1, 25, 100).pipe(balancer, { end: false }); | |
process.stdout.on('error', process.exit); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment