Skip to content

Instantly share code, notes, and snippets.

@jfet97
Last active September 13, 2019 12:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jfet97/527e5f4c4b2434617482e29566e26ef8 to your computer and use it in GitHub Desktop.
Save jfet97/527e5f4c4b2434617482e29566e26ef8 to your computer and use it in GitHub Desktop.
ReadableStreamAsyncIteratorPrototype
const ReadableStreamAsyncIteratorPrototype = Object.create(AsyncIteratorPrototype, {
stream: {
get() {
return this[kStream];
},
enumerable: true,
configurable: true
},
next: {
value: function () {
// If we have detected an error in the meanwhile
// reject straight away.
const error = this[kError];
if (error !== null) {
return Promise.reject(error);
}
if (this[kEnded]) {
return Promise.resolve(createIterResult(undefined, true));
}
if (this[kStream].destroyed) {
// We need to defer via nextTick because if .destroy(err) is
// called, the error will be emitted via nextTick, and
// we cannot guarantee that there is no error lingering around
// waiting to be emitted.
return new Promise((resolve, reject) => {
process.nextTick(() => {
if (this[kError]) {
reject(this[kError]);
} else {
resolve(createIterResult(undefined, true));
}
});
});
}
// If we have multiple next() calls we will wait for the previous Promise to
// finish. This logic is optimized to support for await loops, where next()
// is only called once at a time.
const lastPromise = this[kLastPromise];
let promise;
if (lastPromise) {
promise = new Promise(wrapForNext(lastPromise, this));
} else {
// Fast path needed to support multiple this.push()
// without triggering the next() queue.
const data = this[kStream].read();
if (data !== null) {
return Promise.resolve(createIterResult(data, false));
}
promise = new Promise(this[kHandlePromise]);
}
this[kLastPromise] = promise;
return promise;
},
writable: true,
enumerable: true,
configurable: true
},
return: {
value: function () {
return new Promise((resolve, reject) => {
const stream = this[kStream];
finished(stream, (err) => {
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
reject(err);
} else {
resolve(createIterResult(undefined, true));
}
});
stream.destroy();
});
},
writable: true,
enumerable: true,
configurable: true
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment