Last active
May 10, 2018 19:58
-
-
Save julien-f/34e1807c5e6cad71f342137489749c24 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export default (stream, n) => new Promise((resolve, reject) => { | |
const chunks = [] | |
let i = 0 | |
function clean () { | |
stream.removeListener('readable', onReadable) | |
stream.removeListener('end', onEnd) | |
stream.removeListener('error', onError) | |
} | |
function resolve2 (done) { | |
clean() | |
resolve({ | |
done, | |
value: Buffer.concat(chunks, i), | |
}) | |
} | |
function onEnd () { | |
resolve2(true) | |
clean() | |
} | |
function onError (error) { | |
reject(error) | |
clean() | |
} | |
function onReadable () { | |
const chunk = stream.read(n - i) | |
if (chunk === null) { | |
return // wait for more data | |
} | |
i += chunk.length | |
chunks.push(chunk) | |
if (i >= n) { | |
resolve2(false) | |
} | |
} | |
stream.on('end', onEnd) | |
stream.on('error', onError) | |
stream.on('readable', onReadable) | |
if (stream.readable) { | |
onReadable() | |
} | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function Record (done, value) { | |
this.done = done | |
this.value = value | |
} | |
const DONE = new Record(true, undefined) | |
function onEnd () { | |
if (this._resolve !== undefined && this._i !== 0) { | |
resolve(this) | |
} | |
this._promise = Promise.resolve(DONE) | |
close(this) | |
} | |
function onError (error) { | |
const reject = this._reject | |
if (reject !== undefined) { | |
reject(error) | |
} else { | |
this._promise = Promise.reject(error) | |
} | |
close(this) | |
} | |
function onReadable () { | |
if (this._resolve === undefined) { | |
return | |
} | |
let i = this._i | |
const chunk = this._stream.read(this._n - i) | |
if (chunk === null) { | |
return | |
} | |
this._chunks.push(chunk) | |
this._i = (i += chunk.length) | |
if (i >= this._n) { | |
resolve(this) | |
} | |
} | |
function close (it) { | |
// remove stream listeners | |
const stream = it._stream | |
stream.removeListener('end', it._onEnd) | |
stream.removeListener('error', it._onError) | |
stream.removeListener('readable', it._onReadable) | |
// remove every properties except the latest promise | |
it._chunks = it._handlePromise = it._onEnd = it._onError = it._onReadable = it._reject = it._resolve = it._stream = undefined | |
} | |
function handlePromise (resolve, reject) { | |
this._resolve = resolve | |
this._reject = reject | |
} | |
function resolve (it) { | |
it._resolve(new Record(false, Buffer.concat(it._chunks, it._i))) | |
it._chunks.length = it._i = 0 | |
it._promise = it._reject = it._resolve = undefined | |
} | |
class ChunkIterator { | |
constructor (stream, n) { | |
this._chunks = [] | |
this._handlePromise = handlePromise.bind(this) | |
this._i = 0 | |
this._n = n | |
this._promise = undefined | |
this._reject = undefined | |
this._resolve = undefined | |
this._stream = stream | |
stream.on('end', this._onEnd = onEnd.bind(this)) | |
stream.on('error', this._onError = onError.bind(this)) | |
stream.on('readable', this._onReadable = onReadable.bind(this)) | |
} | |
[Symbol.asyncIterator] () { | |
return this | |
} | |
next () { | |
let promise = this._promise | |
if (promise === undefined) { | |
promise = this._promise = new Promise(this._handlePromise) | |
if (this._stream.readable) { | |
this._onReadable() | |
} | |
} | |
return promise | |
} | |
return (value) { | |
close(this) | |
return (this._promise = Promise.resolve(new Record(true, value))) | |
} | |
throw (exception) { | |
close(this) | |
return (this._promise = Promise.reject(exception)) | |
} | |
} | |
export default (stream, n) => new ChunkIterator(stream, n) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment