Last active
March 1, 2020 19:08
-
-
Save ksm2/cc5e3542819d6c2699cb4c0a0b44d494 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Readable } from 'stream'; | |
import ReadableStreamIterable from './ReadableStreamIterable'; | |
describe('ReadableStreamIterable', () => { | |
it('consumes pre-pushed items', async () => { | |
const r = new Readable({ objectMode: true }); | |
const i = new ReadableStreamIterable(r); | |
r.push(1); | |
r.push(2); | |
r.push(3); | |
r.push(null); | |
expect(await i.next()).toStrictEqual({ done: false, value: 1 }); | |
expect(await i.next()).toStrictEqual({ done: false, value: 2 }); | |
expect(await i.next()).toStrictEqual({ done: false, value: 3 }); | |
expect(await i.next()).toStrictEqual({ done: true, value: undefined }); | |
}); | |
it('consumes post-pushed items', async () => { | |
const r = new Readable({ objectMode: true }); | |
const i = new ReadableStreamIterable(r); | |
let p1 = i.next(); | |
let p2 = i.next(); | |
let p3 = i.next(); | |
let p4 = i.next(); | |
r.push(1); | |
r.push(2); | |
r.push(3); | |
r.push(null); | |
expect(await p1).toStrictEqual({ done: false, value: 1 }); | |
expect(await p2).toStrictEqual({ done: false, value: 2 }); | |
expect(await p3).toStrictEqual({ done: false, value: 3 }); | |
expect(await p4).toStrictEqual({ done: true, value: undefined }); | |
}); | |
it('consumes mixed items', async () => { | |
const r = new Readable({ objectMode: true }); | |
const i = new ReadableStreamIterable(r); | |
let p1 = i.next(); | |
r.push(1); | |
let p2 = i.next(); | |
r.push(2); | |
let p3 = i.next(); | |
r.push(3); | |
let p4 = i.next(); | |
r.push(null); | |
expect(await p1).toStrictEqual({ done: false, value: 1 }); | |
expect(await p2).toStrictEqual({ done: false, value: 2 }); | |
expect(await p3).toStrictEqual({ done: false, value: 3 }); | |
expect(await p4).toStrictEqual({ done: true, value: undefined }); | |
}); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ObjectReadable } from '@grpc/grpc-js/build/src/object-stream'; | |
const END_OF_STREAM = Symbol('END_OF_STREAM'); | |
type QueueItem<T> = T | typeof END_OF_STREAM; | |
class ReadableStreamIterable<T> implements AsyncIterableIterator<T> { | |
private readonly queue: QueueItem<T>[] = []; | |
private readonly waiters: ((next: QueueItem<T>) => void)[] = []; | |
constructor(readable: ObjectReadable<T>) { | |
readable.on('data', this.handleData.bind(this)); | |
readable.on('end', this.handleEnd.bind(this)); | |
readable.on('error', this.handleError.bind(this)); | |
} | |
[Symbol.asyncIterator](): AsyncIterableIterator<T> { | |
return this; | |
} | |
async next(): Promise<IteratorResult<T, undefined>> { | |
const next = await this.awaitNextElement(); | |
if (next === END_OF_STREAM) return { done: true, value: undefined }; | |
return { done: false, value: next }; | |
} | |
private awaitNextElement(): Promise<QueueItem<T>> { | |
return new Promise((resolve) => { | |
this.waiters.push(resolve); | |
this.process(); | |
}) | |
} | |
private handleData(chunk: T): void { | |
this.queue.push(chunk); | |
this.process(); | |
} | |
private handleEnd(): void { | |
this.queue.push(END_OF_STREAM); | |
this.process(); | |
} | |
private handleError(reason?: any): void { | |
throw reason; | |
} | |
private process(): void { | |
if (!this.queue.length || !this.waiters.length) { | |
return; | |
} | |
const nextItem = this.queue.shift()!; | |
const waiter = this.waiters.shift()!; | |
waiter(nextItem); | |
this.process(); | |
} | |
} | |
export default ReadableStreamIterable; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment