Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
export function drained$<T>(pullStream: Function): Rx.Observable<T> {
return Rx.Observable.create(function subscribe(observer: Rx.Observer<T>) {
const drain = function drain(read: Function) {
read(null, function more(end: any | boolean, data: T) {
if (end === true) {
observer.complete();
return;
}
if (end) {
observer.error(end);
return;
}
observer.next(data);
read(null, more);
});
};
try {
drain(pullStream);
} catch (e) {
observer.error(e);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.