Skip to content

Instantly share code, notes, and snippets.

@julien-f
Last active May 10, 2018 19:58
Show Gist options
  • Save julien-f/34e1807c5e6cad71f342137489749c24 to your computer and use it in GitHub Desktop.
Save julien-f/34e1807c5e6cad71f342137489749c24 to your computer and use it in GitHub Desktop.
export default (stream, n) => new Promise((resolve, reject) => {
const chunks = []
let i = 0
function clean () {
stream.removeListener('readable', onReadable)
stream.removeListener('end', onEnd)
stream.removeListener('error', onError)
}
function resolve2 (done) {
clean()
resolve({
done,
value: Buffer.concat(chunks, i),
})
}
function onEnd () {
resolve2(true)
clean()
}
function onError (error) {
reject(error)
clean()
}
function onReadable () {
const chunk = stream.read(n - i)
if (chunk === null) {
return // wait for more data
}
i += chunk.length
chunks.push(chunk)
if (i >= n) {
resolve2(false)
}
}
stream.on('end', onEnd)
stream.on('error', onError)
stream.on('readable', onReadable)
if (stream.readable) {
onReadable()
}
})
function Record (done, value) {
this.done = done
this.value = value
}
const DONE = new Record(true, undefined)
function onEnd () {
if (this._resolve !== undefined && this._i !== 0) {
resolve(this)
}
this._promise = Promise.resolve(DONE)
close(this)
}
function onError (error) {
const reject = this._reject
if (reject !== undefined) {
reject(error)
} else {
this._promise = Promise.reject(error)
}
close(this)
}
function onReadable () {
if (this._resolve === undefined) {
return
}
let i = this._i
const chunk = this._stream.read(this._n - i)
if (chunk === null) {
return
}
this._chunks.push(chunk)
this._i = (i += chunk.length)
if (i >= this._n) {
resolve(this)
}
}
function close (it) {
// remove stream listeners
const stream = it._stream
stream.removeListener('end', it._onEnd)
stream.removeListener('error', it._onError)
stream.removeListener('readable', it._onReadable)
// remove every properties except the latest promise
it._chunks = it._handlePromise = it._onEnd = it._onError = it._onReadable = it._reject = it._resolve = it._stream = undefined
}
function handlePromise (resolve, reject) {
this._resolve = resolve
this._reject = reject
}
function resolve (it) {
it._resolve(new Record(false, Buffer.concat(it._chunks, it._i)))
it._chunks.length = it._i = 0
it._promise = it._reject = it._resolve = undefined
}
class ChunkIterator {
constructor (stream, n) {
this._chunks = []
this._handlePromise = handlePromise.bind(this)
this._i = 0
this._n = n
this._promise = undefined
this._reject = undefined
this._resolve = undefined
this._stream = stream
stream.on('end', this._onEnd = onEnd.bind(this))
stream.on('error', this._onError = onError.bind(this))
stream.on('readable', this._onReadable = onReadable.bind(this))
}
[Symbol.asyncIterator] () {
return this
}
next () {
let promise = this._promise
if (promise === undefined) {
promise = this._promise = new Promise(this._handlePromise)
if (this._stream.readable) {
this._onReadable()
}
}
return promise
}
return (value) {
close(this)
return (this._promise = Promise.resolve(new Record(true, value)))
}
throw (exception) {
close(this)
return (this._promise = Promise.reject(exception))
}
}
export default (stream, n) => new ChunkIterator(stream, n)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment