Skip to content

Instantly share code, notes, and snippets.

@zenparsing
Last active September 28, 2015 14:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zenparsing/9ab5a930fdf43f47de77 to your computer and use it in GitHub Desktop.
Save zenparsing/9ab5a930fdf43f47de77 to your computer and use it in GitHub Desktop.
Observable zip with Various Return Types
function zip(list) {
return new Observable(sink => {
function trySend() {
// If every buffer has at least one element, then send an array
// containing every first element and remove those elements from the
// buffer
if (streams.every(s => s.buffer.length > 0))
sink.next(streams.map(s => s.buffer.shift()));
tryComplete();
}
function tryComplete() {
// If there is a stream which is completed and whose buffer is empty,
// then send a complete signal to the sink
if (streams.some(s => s.completed && s.buffer.length === 0))
sink.complete();
}
let streams = Array.from(list, observable => {
let stream = { buffer: [], cancel: null, completed: false };
stream.cancel = observable.subscribe({
next(v) {
if (stream.buffer.push(v) === 1)
trySend();
},
error(e) {
return sink.error(e);
},
complete() {
stream.completed = true;
tryComplete();
},
});
return stream;
});
return _=> {
// On cleanup, cancel each subscription
for (let stream of streams)
stream.cancel();
};
});
}
function zip(list) {
return new Observable(sink => {
function trySend() {
// If every buffer has at least one element, then send an array
// containing every first element and remove those elements from the
// buffer
if (streams.every(s => s.buffer.length > 0))
sink.next(streams.map(s => s.buffer.shift()));
tryComplete();
}
function tryComplete() {
// If there is a stream which is completed and whose buffer is empty,
// then send a complete signal to the sink
if (streams.some(s => s.subscription.isUnsubscribed() && s.buffer.length === 0))
sink.complete();
}
let streams = Array.from(list, observable => {
let stream = { buffer: [], subscription: null };
stream.subscription = observable.subscribe({
next(v) {
if (stream.buffer.push(v) === 1)
trySend();
},
error(e) {
return sink.error(e);
},
complete() {
tryComplete();
},
});
return stream;
});
return _=> {
// On cleanup, cancel each subscription
for (let stream of streams)
stream.subscription.unsubscribe();
};
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment