Skip to content

Instantly share code, notes, and snippets.

@zenparsing
Last active August 29, 2015 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zenparsing/91852861fccd8a8ca956 to your computer and use it in GitHub Desktop.
Save zenparsing/91852861fccd8a8ca956 to your computer and use it in GitHub Desktop.
mergeStreams with Async Iterators and Observables
async function* flattenStreams(asyncIterList) {
function deferred() {
let capability = {};
capability.promise = new Promise((resolve, reject) => {
capability.resolve = resolve;
capability.reject = reject;
});
return capability;
}
// We maintain a queue of promises for the received items
let top = deferred(),
queue = [top],
total = 0,
finished = 0,
cleanup = false;
for (let iter of asyncIterList) {
total += 1;
// Spawn a "worker" for each input stream
(async iter => {
try {
// For each item, resolve the promise at the back of the queue
// and push another unresolved promise.
for async (let x of iter) {
if (cleanup) return;
top.resolve(x);
queue.push(top = deferred());
}
} catch (x) {
// Same thing for errors
top.reject(x);
queue.push(top = deferred());
} finally {
finished += 1;
}
})(iter);
}
// Finally, yield everything that gets put into the queue until
// all input streams are finished
try {
while (finished < total || queue.length > 0) {
yield await queue.shift().promise;
}
} finally {
cleanup = true;
}
}
// === A little test case ===
async function* g1() {
yield 1;
await 1;
yield 2;
}
async function* g2() {
yield 3;
await 1;
await 1;
yield 4;
}
async function* g3() {
await 1;
yield 5;
await 1;
yield 6;
await 1;
}
(async _=> {
for async (let x of flattenStreams([g1(), g2(), g3()])) {
console.log(x);
}
})();
function mergeStreams(observableList) {
return new Observable(sink => {
let cancelList = [],
count = 0;
for (let observable of observableList) {
count += 1;
let cancel = observable.subscribe({
next(value) { return sink.next(value) },
throw(value) { return sink.throw(value) },
return() {
if (--count === 0)
sink.return();
},
});
cancelList.push(cancel);
}
return _=> {
for (let cancel of cancelList)
cancel();
};
});
}
let o1 = new Observable(sink => {
sink.next(1);
sink.next(2);
sink.return();
});
let o2 = new Observable(sink => {
sink.next(3);
sink.next(4);
sink.return();
});
let o3 = new Observable(sink => {
sink.next(5);
sink.next(6);
sink.return();
});
mergeStreams([o1, o2, o3]).forEach(x => {
console.log(x);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment