Skip to content

Instantly share code, notes, and snippets.

@zone117x
Last active January 12, 2021 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zone117x/db830217dd6721360c2dc18924a6e033 to your computer and use it in GitHub Desktop.
Save zone117x/db830217dd6721360c2dc18924a6e033 to your computer and use it in GitHub Desktop.
Readline transform stream
import * as readline from 'readline';
import * as stream from 'stream';
export async function* asyncIterableToGenerator<T>(iter: AsyncIterable<T>) {
for await (const entry of iter) {
yield entry;
}
}
export class LineReaderStream extends stream.Duplex {
asyncGen: AsyncGenerator<string, void, unknown>;
readlineInstance: readline.Interface;
passthrough: stream.Duplex;
constructor(opts?: stream.DuplexOptions) {
super({ readableObjectMode: true, ...opts });
this.passthrough = new stream.PassThrough();
this.readlineInstance = readline.createInterface({
input: this.passthrough,
crlfDelay: Infinity,
});
this.asyncGen = asyncIterableToGenerator(this.readlineInstance);
}
async _read(size: number) {
for (let i = 0; i < size; i++) {
const chunk = await this.asyncGen.next();
if (!this.push(chunk.done ? null : chunk.value)) {
break;
}
}
}
_write(chunk: any, encoding: string, callback: (error?: Error | null) => void) {
this.passthrough.write(chunk, encoding, callback);
}
_destroy(error: any, callback: (error: Error | null) => void) {
this.passthrough.destroy(error);
this.readlineInstance.close();
callback(error);
}
_final(callback: (error?: Error | null) => void) {
this.passthrough.end(() => callback());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment