Skip to content

Instantly share code, notes, and snippets.

@brandonbloom
Last active September 8, 2017 21:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brandonbloom/e5919201687bf54d4c8a9896d79372ad to your computer and use it in GitHub Desktop.
Save brandonbloom/e5919201687bf54d4c8a9896d79372ad to your computer and use it in GitHub Desktop.
export class Signal<T> {
private _promise: Promise<T>;
private _resolve: (x: T) => void;
constructor() {
this._makePromise();
}
_makePromise() {
this._promise = new Promise((resolve, reject) => {
this._resolve = resolve;
});
}
broadcast(x: T) {
this._resolve(x);
this._makePromise();
}
waiter(): Promise<T> {
return this._promise;
}
}
type RStream = NodeJS.ReadableStream;
type Chunk = string | Buffer;
let chunks = async function*(r: RStream): AsyncIterableIterator<Chunk> {
let signal = new Signal<Chunk | Error | null>();
r.on('data', (chunk) => {
signal.broadcast(chunk);
r.pause();
});
r.on('error', (err: Error) => {
signal.broadcast(err);
});
r.on('end', () => {
signal.broadcast(null);
});
while (true) {
let event = await signal.waiter();
if (event === null) {
break;
}
if (event instanceof Error) {
throw event;
}
yield event;
r.resume();
}
};
// Yields lines with the '\n' cut off. Passes '\r' through untouched.
let readLines = async function*(r: RStream): AsyncIterableIterator<string> {
r.setEncoding('utf8');
let buf = '';
for await (let chunk of chunks(r) as AsyncIterable<string>) {
let i = chunk.indexOf('\n');
if (i < 0) {
buf += chunk;
} else {
yield buf + chunk.slice(0, i);
chunk = chunk.slice(i + 1);
while (true) {
let i = chunk.indexOf('\n');
if (i < 0) {
buf = chunk;
break;
}
yield chunk.slice(0, i);
chunk = chunk.slice(i + 1);
}
}
}
if (buf.length > 0) {
yield buf;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment