Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Flip It and Reverse It
function toObservable(asyncIter) {
return new Observable(sink => {
let abort = false;
(async _=> {
try {
for async (let x of asyncIter) {
if (abort) return;
sink.next(x);
}
} catch (x) {
sink.throw(x);
}
sink.return();
})();
return _=> {
abort = true;
};
});
}
function mergeStreams(asyncIterList) {
let observableList = asyncIterList.map(toObservable);
let observable = new Observable(sink => {
let cancelList = [],
count = 0;
for (let observable of observableList) {
count += 1;
let cancel = observable.subscribe({
next(value) { return sink.next(value) },
throw(value) { return sink.throw(value) },
return() {
if (--count === 0)
sink.return();
},
});
cancelList.push(cancel);
}
return _=> {
for (let cancel of cancelList)
cancel();
};
});
return observable[Symbol.asyncIterator]();
}
// === A little test case ===
async function* g1() {
yield 1;
await 1;
yield 2;
}
async function* g2() {
yield 3;
await 1;
await 1;
yield 4;
}
async function* g3() {
await 1;
yield 5;
await 1;
yield 6;
await 1;
}
(async _=> {
for async (let x of mergeStreams([g1(), g2(), g3()])) {
console.log(x);
}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.