Skip to content

Instantly share code, notes, and snippets.

@bman654
Created April 5, 2016 14:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bman654/92749cd93cdd84a540e41403f25a2105 to your computer and use it in GitHub Desktop.
Save bman654/92749cd93cdd84a540e41403f25a2105 to your computer and use it in GitHub Desktop.
JavaScript concatLatest operator for RxJS
Rx.Observable.prototype.concatLatest = function () {
/// <summary>
/// Concatenates an observable sequence of observable sequences, skipping sequences that arrive while the current sequence is being observed.
/// If N new observables arrive while the current observable is being observed, the first N-1 new observables will be thrown
/// away and only the Nth will be observed.
/// </summary>
/// <returns type="Rx.Observable"></returns>
var source = this;
return Rx.Observable.create(function (observer) {
var latest,
isStopped,
isBusy,
outerSubscription = new Rx.SingleAssignmentDisposable(),
innerSubscription = new Rx.SerialDisposable(),
subscriptions = new Rx.CompositeDisposable(outerSubscription, innerSubscription),
onError = observer.onError.bind(observer),
onNext = observer.onNext.bind(observer),
innerOnComplete = function () {
var inner = latest;
if (inner) {
latest = undefined;
innerSubscription.setDisposable(inner.subscribe(onNext, onError, innerOnComplete));
}
else {
isBusy = false;
if (isStopped) {
observer.onCompleted();
}
}
};
outerSubscription.setDisposable(source.subscribe(function (newInner) {
if (isBusy) {
latest = newInner;
}
else {
isBusy = true;
innerSubscription.setDisposable(newInner.subscribe(onNext, onError, innerOnComplete));
}
}, onError, function () {
isStopped = true;
if (!isBusy) {
observer.onCompleted();
}
}));
return subscriptions;
});
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment