Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Scheduling Observable using Promises
Observable.from = function(iterable) {
return new Observable((push, error, close) => {
try { for (let x of iterable) push(x) }
catch (x) { error(x) }
finally { close() }
});
};
function defer() {
let source = this, aborted = false;
return new class extends Observable {
constructor() {
super(sink => {
let cancel;
Promise.resolve().then(_=> {
if (aborted)
return;
cancel = source.subscribe({
next(x) { Promise.resolve().then(_=> sink.next(x)) },
throw(x) { Promise.resolve().then(_=> sink.throw(x)) },
return(x) { Promise.resolve().then(_=> sink.return(x)) },
});
}, e => sink.throw(e));
return _=> { if (cancel) cancel() };
});
}
subscribe(...args) {
let cancel = super.subscribe(...args);
return _=> {
aborted = true;
Promise.resolve().then(_=> cancel());
};
}
};
}
Observable.from([5, 4, 3, 2])::defer().forEach(print)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.