Skip to content

Instantly share code, notes, and snippets.

@ksm2
Last active March 1, 2020 19:08
Show Gist options
  • Save ksm2/cc5e3542819d6c2699cb4c0a0b44d494 to your computer and use it in GitHub Desktop.
Save ksm2/cc5e3542819d6c2699cb4c0a0b44d494 to your computer and use it in GitHub Desktop.
import { Readable } from 'stream';
import ReadableStreamIterable from './ReadableStreamIterable';
describe('ReadableStreamIterable', () => {
it('consumes pre-pushed items', async () => {
const r = new Readable({ objectMode: true });
const i = new ReadableStreamIterable(r);
r.push(1);
r.push(2);
r.push(3);
r.push(null);
expect(await i.next()).toStrictEqual({ done: false, value: 1 });
expect(await i.next()).toStrictEqual({ done: false, value: 2 });
expect(await i.next()).toStrictEqual({ done: false, value: 3 });
expect(await i.next()).toStrictEqual({ done: true, value: undefined });
});
it('consumes post-pushed items', async () => {
const r = new Readable({ objectMode: true });
const i = new ReadableStreamIterable(r);
let p1 = i.next();
let p2 = i.next();
let p3 = i.next();
let p4 = i.next();
r.push(1);
r.push(2);
r.push(3);
r.push(null);
expect(await p1).toStrictEqual({ done: false, value: 1 });
expect(await p2).toStrictEqual({ done: false, value: 2 });
expect(await p3).toStrictEqual({ done: false, value: 3 });
expect(await p4).toStrictEqual({ done: true, value: undefined });
});
it('consumes mixed items', async () => {
const r = new Readable({ objectMode: true });
const i = new ReadableStreamIterable(r);
let p1 = i.next();
r.push(1);
let p2 = i.next();
r.push(2);
let p3 = i.next();
r.push(3);
let p4 = i.next();
r.push(null);
expect(await p1).toStrictEqual({ done: false, value: 1 });
expect(await p2).toStrictEqual({ done: false, value: 2 });
expect(await p3).toStrictEqual({ done: false, value: 3 });
expect(await p4).toStrictEqual({ done: true, value: undefined });
});
});
import { ObjectReadable } from '@grpc/grpc-js/build/src/object-stream';
const END_OF_STREAM = Symbol('END_OF_STREAM');
type QueueItem<T> = T | typeof END_OF_STREAM;
class ReadableStreamIterable<T> implements AsyncIterableIterator<T> {
private readonly queue: QueueItem<T>[] = [];
private readonly waiters: ((next: QueueItem<T>) => void)[] = [];
constructor(readable: ObjectReadable<T>) {
readable.on('data', this.handleData.bind(this));
readable.on('end', this.handleEnd.bind(this));
readable.on('error', this.handleError.bind(this));
}
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
return this;
}
async next(): Promise<IteratorResult<T, undefined>> {
const next = await this.awaitNextElement();
if (next === END_OF_STREAM) return { done: true, value: undefined };
return { done: false, value: next };
}
private awaitNextElement(): Promise<QueueItem<T>> {
return new Promise((resolve) => {
this.waiters.push(resolve);
this.process();
})
}
private handleData(chunk: T): void {
this.queue.push(chunk);
this.process();
}
private handleEnd(): void {
this.queue.push(END_OF_STREAM);
this.process();
}
private handleError(reason?: any): void {
throw reason;
}
private process(): void {
if (!this.queue.length || !this.waiters.length) {
return;
}
const nextItem = this.queue.shift()!;
const waiter = this.waiters.shift()!;
waiter(nextItem);
this.process();
}
}
export default ReadableStreamIterable;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment