-
-
Save chrisdickinson/57d2debb87cbbb7e6f27 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = pooled | |
var through = require('through') | |
function pooled(fn) { | |
var stream = through(write, end) | |
, pool = [] | |
, want = 0 | |
, got = 0 | |
, ondrain | |
, onread | |
fn(read) | |
return stream | |
function write(buf) { | |
var input | |
, had | |
pool.push(buf) | |
got += buf.length | |
if(stream.paused && !ondrain) { | |
ondrain = true | |
return void stream.once('drain', function() { | |
ondrain = false | |
exec() | |
}) | |
} | |
return void exec() | |
function exec() { | |
had = got | |
got -= want | |
if(got < 0) { | |
got = 0 | |
return | |
} | |
input = build_buffer(had - got) | |
stream.pause() | |
process.nextTick(function() { | |
onread.call(stream, input) | |
stream.resume() | |
}) | |
} | |
} | |
function build_buffer(num) { | |
var out = new Buffer(num) | |
, offset = 0 | |
, tocopy | |
, tmp | |
for(var i = 0, len = pool.length; i < len; ++i) { | |
tocopy = Math.min(pool[i].length, num) | |
pool[i].copy(out, offset, 0, tocopy) | |
offset += tocopy | |
if(offset === num) { | |
break | |
} | |
} | |
if(i === len) { | |
// exhausted pool entirely | |
pool.length = 0 | |
} else if(tocopy < pool[i].length) { | |
// partially exhausted last member, | |
// keep it around but discard the rest. | |
tmp = pool[i].slice(tocopy) | |
pool.splice(0, i + 1, tmp) | |
} else { | |
// completely exhausted last member, | |
// discard everything up to and including | |
// that member. | |
pool.splice(0, i + 1) | |
} | |
return out | |
} | |
function end() { | |
if(got < want) { | |
stream.emit('error', new Error('not enough bytes')) | |
return | |
} | |
stream.queue(null) | |
} | |
function read(num, _onread) { | |
want = num | |
onread = _onread | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment