Skip to content

Instantly share code, notes, and snippets.

@royra
Created November 26, 2019 09:43
Show Gist options
  • Save royra/988d541c84a5509538576c460092fd2e to your computer and use it in GitHub Desktop.
Save royra/988d541c84a5509538576c460092fd2e to your computer and use it in GitHub Desktop.
Convert an "old" JS stream to AsyncIterable
//
// Convert an "old" stream to AsyncIterable.
//
// Stream are "push" interfaces and iterators are "pull" interfaces. So there needs
// to be some kind of queue to accumulate the pushed data while waiting for "pulls",
// i.e, calls to next(). In this implementation the queue is the pendingValues array.
//
// Note: Not needed for current implementations of NodeJS Readable stream, as they are
// already AsyncIterable.
//
interface OldReadable {
on(event: "close", listener: () => void): this;
on(event: "data", listener: (chunk: any) => void): this;
on(event: "end", listener: () => void): this;
on(event: "error", listener: (err: Error) => void): this;
}
type PromiseResolver<T> = {
resolve: (result: T) => void
reject: (err: Error) => void
}
export const asAsyncIterable = <T>(stream: OldReadable): AsyncIterable<T> => {
let done = false
let error: Error
let pendingNextPromise: PromiseResolver<IteratorResult<T>> | null = null
const pendingValues: T[] = []
const onEnd = () => {
done = true
if (pendingNextPromise) {
pendingNextPromise.resolve({ done: true, value: undefined })
pendingNextPromise = null
}
}
stream.on('end', onEnd)
stream.on('close', onEnd)
stream.on('error', err => {
error = err
if (pendingNextPromise) {
pendingNextPromise.reject(err)
pendingNextPromise = null
}
})
stream.on('data', (value: T) => {
if (pendingNextPromise) {
pendingNextPromise.resolve({ done: false, value })
pendingNextPromise = null
} else {
pendingValues.push(value)
}
})
return {
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
next(): Promise<IteratorResult<T>> {
// return pending values before errors
if (pendingValues.length > 0) { // can't replace length check with shift and a test for
// undefined, since T itself might be the type undefined
const value = pendingValues.shift() as T
return Promise.resolve({ done: false, value })
}
// return errors before "end"
if (error !== undefined) {
return Promise.reject(error)
}
if (done) {
return Promise.resolve({ done: true, value: undefined })
}
// no available value - set pending promise
return new Promise((resolve, reject) => pendingNextPromise = { resolve, reject })
},
}
},
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment