Skip to content

Instantly share code, notes, and snippets.

@staltz
Created May 9, 2017 12:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save staltz/f2fca31c3a43002cef6449838724dcf0 to your computer and use it in GitHub Desktop.
Save staltz/f2fca31c3a43002cef6449838724dcf0 to your computer and use it in GitHub Desktop.
export function drained$<T>(pullStream: Function): Rx.Observable<T> {
return Rx.Observable.create(function subscribe(observer: Rx.Observer<T>) {
const drain = function drain(read: Function) {
read(null, function more(end: any | boolean, data: T) {
if (end === true) {
observer.complete();
return;
}
if (end) {
observer.error(end);
return;
}
observer.next(data);
read(null, more);
});
};
try {
drain(pullStream);
} catch (e) {
observer.error(e);
}
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment