Skip to content

Instantly share code, notes, and snippets.

@chrisdickinson
Created April 9, 2013 18:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrisdickinson/57d2debb87cbbb7e6f27 to your computer and use it in GitHub Desktop.
Save chrisdickinson/57d2debb87cbbb7e6f27 to your computer and use it in GitHub Desktop.
module.exports = pooled
var through = require('through')
function pooled(fn) {
var stream = through(write, end)
, pool = []
, want = 0
, got = 0
, ondrain
, onread
fn(read)
return stream
function write(buf) {
var input
, had
pool.push(buf)
got += buf.length
if(stream.paused && !ondrain) {
ondrain = true
return void stream.once('drain', function() {
ondrain = false
exec()
})
}
return void exec()
function exec() {
had = got
got -= want
if(got < 0) {
got = 0
return
}
input = build_buffer(had - got)
stream.pause()
process.nextTick(function() {
onread.call(stream, input)
stream.resume()
})
}
}
function build_buffer(num) {
var out = new Buffer(num)
, offset = 0
, tocopy
, tmp
for(var i = 0, len = pool.length; i < len; ++i) {
tocopy = Math.min(pool[i].length, num)
pool[i].copy(out, offset, 0, tocopy)
offset += tocopy
if(offset === num) {
break
}
}
if(i === len) {
// exhausted pool entirely
pool.length = 0
} else if(tocopy < pool[i].length) {
// partially exhausted last member,
// keep it around but discard the rest.
tmp = pool[i].slice(tocopy)
pool.splice(0, i + 1, tmp)
} else {
// completely exhausted last member,
// discard everything up to and including
// that member.
pool.splice(0, i + 1)
}
return out
}
function end() {
if(got < want) {
stream.emit('error', new Error('not enough bytes'))
return
}
stream.queue(null)
}
function read(num, _onread) {
want = num
onread = _onread
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment