Skip to content

Instantly share code, notes, and snippets.

@azz
Last active July 22, 2023 08:20
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save azz/35b045a11eae997e85f2da5ceb32972b to your computer and use it in GitHub Desktop.
Save azz/35b045a11eae997e85f2da5ceb32972b to your computer and use it in GitHub Desktop.
Extensions to Knockout and RxJS to allow observable conversion.
'use strict';
// Prototypal extensions to knockout and RxJS to fascilitate conversion.
// based on <http://bit.ly/226Yj2o>
import ko from 'knockout';
import Rx from 'rx';
/**
* knockout.subscribable to Rx.Observable
*/
// Knockout uses `fn` instead of `prototype`
ko.subscribable.fn.toRxObservable = function (startWithCurrentValue = false) {
return Rx.Observable.create(observer => {
// create a subscription, calling onNext on change
const koSubscription = this.subscribe(observer.onNext, observer);
// hack into the underlying ko.subscribable so that if it is an existing
// ko.subscription, its disposal terminates the Rx.Observable
if (this.dispose) {
const { dispose } = this;
this.dispose = (...args) => {
// call the underlying knockout disposal function
dispose.apply(this, args);
// call the observer's onCompleted
observer.onCompleted();
}
}
// start with the current value if applicable
if (startWithCurrentValue && ko.isObservable(this)) {
const currentValue = this();
currentValue === undefined || observer.onNext(currentValue);
}
// dispose of the ko.subscription when the Rx.Observable is disposed
return koSubscription.dispose.bind(koSubscription);
});
}
// Static helper from Rx.Observable, mirrors `fromPromise`, `fromEvent`, etc.
Rx.Observable.fromKnockout = (koSubscribable) => {
return koSubscribable.toRxObservable();
}
/**
* Rx.Observable to ko.computed
*/
Rx.Observable.prototype.toKnockoutComputed = function () {
const koObservable = ko.observable();
const rxDisposable = new Rx.SingleAssignmentDisposable;
const computed = ko.pureComputed(() => {
if (!rxDisposable.getDisposable()) {
// This is to prevent our computed from accidentally
// subscribing to any ko observables that happen to
// get evaluated during our call to this.subscribe().
ko.computed(() => {
const rxSubscription = this.subscribe(koObservable);
rxDisposable.setDisposable(rxSubscription);
}).dispose();
}
return koObservable();
});
const { dispose } = computed;
computed.dispose = function (...args) {
rxDisposable.dispose();
dispose.apply(this, args);
};
return computed;
};
interface KnockoutSubscribable<T> {
toRxObservable<T>(startWithCurrentValue?: boolean): Rx.Observable<T>;
}
namespace Rx {
interface DisposableStatic {
fromKnockout<T>(subscribable: KnockoutSubscribable<T>): Observable<T>;
}
interface Disposable<T> {
toKnockoutComputed(): KnockoutComputed<T>;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment