RxJSの実装が長らく気になっていた。動機としては、来月の歌舞伎座.tech#7「Reactive Extensions」の予習の意味合いもあるが、個人的な興味が一番。コードベースはRxJS 2.4.6@089dca5。ロジックとしては結構ややこしいので、この記事も間違いがあるかもしれない点に注意。
本当にただの感想。
だいたい2010~12年頃のJSコードだし、TypeScriptでもないし、インターフェースの名前も独特ではあるけれど、Reactive ExtensionsがそもそもC#由来であるという前提を踏まえると、まあ、こんなもんかといった感じ。連結ベースのビルドシステムにせよ、よく回っているとは思う。
独自のユーティリティ関数などが多いし、TypeScriptでES3ターゲットで吐いた方がコードとしての統一感は出るだろうが、ファイルベース + 連結して初めて全容が完成するビルドシステムを活かした複数のバイナリエディションの提供があるため、TypeScriptでやってもそこまで綺麗にはならないと思う。そもそも Observable<T>
のメソッド数が非常に多いのもあるし。どうしても綺麗にしようとすると、C的なifdefプリプロセッサのような原始的なアプローチも必要になるのではないかと感じる。
そういうコードの癖を除けば、非常によくできているコードだと思う。
おそらくここが最難関だと感じる。プライベート関数を良く使うこともありJSDoc的な型アノテーションもそんなに多く無いので、コードを行ったり来たりすることになる。
具体例がないとイメージしにくいので、こういうコードを例に話を進めることにする。
let source = Rx.Observable.range(0, 3);
let mapped = source.map(function mapFn(x)=> {
return x * x;
});
let observer = Rx.Observer.create(function onNext(x) {
console.log('Next: ' + x);
});
let subscribed = mapped.subscribe(observer);
ここで observer
にonNext
のコールバックしか与えていないのは、問題を単純化するため.
値をsource
から伝播させる方法さえわかれば、まあ細かい注意はあるにせよ、だいたい似たようなアプローチでonError
もonComplete
も実装できるので省略する.
これを考えるにあたって、まず、Observable
の生成を順に見ていく.
まずは Rx.Observable.range
と Rx.Observable.map
だが、この2つがやっていることに大差はない。この2つがやっているのは、(戻り値となる)対応するObservableの生成と、それぞれのメソッドに応じた処理を記述しているだけ。ここで__重要なのは、このObservableがどのように後続に渡されるか、および、自身がどのようにsubscribeされるか・どのように値を受け取るか__になる。これらのObservableの基底オブジェクトである、ObservableBase
を見てみる。
ちなみに、RxJSのリポジトリ内には,
の二カ所にoperatorの定義があり、それぞれでmap
やrange
の実装が存在している。が、前者のperf
はパフォーマンスを改善したもので、2.4系から有効になったもので、改善指針がリリースノートに書いてある。このリリースノートを、もっと早く見つけていれば色々類推を重ねる必要もなかったのに…… というわけで、今回はperf
の方の実装を参照する。
MapObservable
は、MapObservable
-> ObservableBase
-> Observable
という継承関係を持っているが、責務としては (operator特有の処理)
-> Observableのチェーンを繋ぐのに必要な処理
-> 最もプリミティブなObservableとしての処理
と分かれている。最も基底層に位置するObservable
は本当に最低限のpublic APIとそれに関連する処理を提供するだけにとどまっているため、実質的な処理は一段上で定義されているケースが多いようである。
このObservableBase
はコンストラクタとして呼び出されると、subscribe
を引数として自身の基底オブジェクトObservable
を呼び出す。Observable
は引数に渡された関数をthis._subscribe
に保持するだけだが、この this._subscribe
は、自身がsubscribeされた際に呼び出され、実行結果を返すという点を覚えておきたい。
ここで、引数として渡されたsubscribe
を見てみる。今回読んでいるリビジョンでは以下の通り。
function fixSubscriber(subscriber) {
return subscriber && isFunction(subscriber.dispose) ? subscriber :
isFunction(subscriber) ? disposableCreate(subscriber) : disposableEmpty;
}
function setDisposable(s, state) {
var ado = state[0], self = state[1];
var sub = tryCatch(self.subscribeCore).call(self, ado);
if (sub === errorObj) {
if(!ado.fail(errorObj.e)) { return thrower(errorObj.e); }
}
ado.setDisposable(fixSubscriber(sub));
}
function subscribe(observer) {
var ado = new AutoDetachObserver(observer), state = [ado, this];
if (currentThreadScheduler.scheduleRequired()) {
currentThreadScheduler.scheduleWithState(state, setDisposable);
} else {
setDisposable(null, state);
}
return ado;
}
function ObservableBase() {
__super__.call(this, subscribe);
}
ObservableBase.prototype.subscribeCore = notImplemented;
ここからがObservableとObserver(そしてDisposable)の処理を実現する根幹となる。実際、かなりややこしく、デバッガで追っても理解するのには一苦労する。
- まず、
mapped.subscribe()
が呼び出されると、Obserbale._subscribe
に登録したコールバック関数が、引数をobserver
として呼び出される。上に引用したsubscribe
がそれだ。 - 呼び出されたobserverを元に、
AutoDetachObserver
のインスタンスであるado
が生成される。AutoDetachObsever
は自身のメンバとして渡されたobserver
と、SingleAssignmentDisposable
を保持している。 ado
およびthis
コンテクスト(MapObservable
)、コールバックとしてsetDisposable
の3つがSchedulerに投げ込まれる。- Schedulerに投げ込まれたのち、
setDisposable
が実行される。第一引数のs
はSchedulerなので無視して、第二引数のstate
だけ見ればよい- ここで、
self.subscribeCore
すなわちMapObservable.prototype.subscribeCore
がado
を引数として実行される。このado
はsubscribe()
の引数として渡されたものである点に注意. subscribeCore()
は、ado
を元にMapObserver
を生成する. これは、__自身の一つ上流となるObservable(ここではsource
)の値を受け取り、そこにmap(fn)
で渡されたfn
を適用した上で、後続(abo
)に渡すObserver__である- 生成された
MapObserver
を引数として、source.subscribe()
が呼び出され、結果 A を返す。つまりsource.subscribe()
にとってのabo
が _A_である
- ここで、
- 返ってきた A を引数として、
ado.setDisposable
が呼び出される. これは、SingleAssignmentDisposable.setDisposable()
を呼び出しているだけなので、そのまま A はSingleAssignmentDisposable.current
へと収まる- ここで、既に
current
に値が設定されている場合、それのdispose()
が走るのだが、今回は特にそんなことはないので無視する
- ここで、既に
- Schedulerに投げ込まれたのち、
ado
が戻り値として返る、すなわちado
がsubscribe()
の戻り値として返る。つまり、これがサンプルコード中のsubscribed
の値となる。
以上の処理により、以下の関係が構築できた.
abo
=== サンプルコード中のsubscribed
source
<-map
(MapObserver
) <-abo
(AutoDetachObserver
) <-observer
といった方向でsubscribeの関係があるsource
の値をobserver
まで伝播できることがわかるAutoDetachObserver
はobserver
を包んだDisposableなので、実質的には無いものと思って良い
source
の戻り値であるDisposable
はabo.m.current
(SingleAssignmentDisposable
)に参照されている- これにより、
abo.dispose()
することで、abo.m.current.dispose()
を呼び出せる
- これにより、
今回はこれだけでrootまで到達したが、仮に間に挟まるoperatorが長大な物となったとしても、同様の処理をrootとなるObservableまで再帰的に繰り返すことで、以下のことが実現できることがわかる
- Cold Observable
- subscribeされるまで実行されない遅延挙動の実現
- subscribeに応じて常に新規に値を返すObservableの実現
- Disposable.dispose()を呼び出す事で、一連のObservableの連鎖全体をdisposeする
余談ではあるが、 MapObservable
や FilterObservable
ではinternalMap
ないしinternalFilter
というメソッドを所持しており、以下のように同じoperatorが連続して続くケースでは、自身ではなく自身のsourceをsourceとしてObservableを生成することでsubscribeのコストを下げている。
var mapped = source.map((i) => {
return i^2;
}).map((i) => {
return String(i);
});
var filter = source.filter((x) => {
return (x % 2) === 0;
}).filter((x) => {
return x < 10;
});
以上の流れを、整理・一般化すると以下のようなフローになる(はず。ややこしいのでミスあるかも)
- 現在のoperatorの位置を_n_とした場合、自身のsourceとなるObservableに属する物の位置を_n-1_、自身をsubscribeするObserverの位置を_n+1_として表す
- Observable.subscribe()の引数を input、返り値を output とする
map()
などで渡されたmap関数の実行に必要となるObserverを便宜上, _InnerObserver_と呼称する- ECMA262 PromiseにおけるDeferredに類似した概念
- subscribe時の挙動を定めた_onSubscribeCallback(n)_を定義する
- _onSubscribeCallback(n)_を引数として、_Observable(n)_を生成する
- sourceとなる_Observable(n-1)_への参照を_Observable(n)_に保持させる
- _Observable(n)_をoperatorメソッドを呼び出した結果として返す
- Observable(n-1)、Observable(n)、_Observer(n+1)_が存在する物とする
- _Observer(n+1)を_input_として_Observable(n).subscribe() を呼び出す
- _onSubscribeCallback(n)_が実行される
- _InnerObserver(n)_が生成される
- _InnerObserver(n)_に_Observer(n+1)_を保持させる
- _Disposable(n)_が生成される
- _Disposable(n)_に_InnerObserver(n)_を保持させる
- _Disposable(n)_を_onSubscribeCallback(n)_の結果として返す
- _onSubscribeCallback(n)_の結果を、Observable(n).subscribe()の output とする
- _InnerObserver(n)_を引数として_Observable(n-1)_をsubscribeした結果を_Disposable(n-1)_とする
- _Disposable(n)_に_Disposable(n-1)_を参照させる
- output を返す
- _onSubscribeCallback(n)_が実行される
- 2の処理をrootに到達するまで再帰的に実行する
以下の順番で値が伝播することで、root由来の値がleafまで到達できる
- InnerObserver(n-1).onNext()
- InnerObserver(n).onNext()
- Observer(n+1).onNext()
以下の順序でdispose処理が連鎖することで、leaf->rootまでdisposeを伝播させることができる。InnerObserverもdisposeするのは、中継処理を抑えるため。
- Disposable(n).dispose()
- ここで同時に_InnerObserver(n)_.dispose()
- Disposable(n-1).dispose()
- ここで同時に_InnerObserver(n-1)_.dispose()
TypeScriptで簡易実装を作ると(たぶん)こんな感じになる。schedulerに関する実装は本質ではないので除外している。なお、コード中のコメントは、subscribe時のコードの実行順序をメモ書き程度につけている。
class Observable<T> {
private _subscribe: (o: IObserver<T>) => Disposable;
constructor(fn: (o: IObserver<T>) => Disposable) {
this._subscribe = fn;
}
subscribe(observer: IObserver<T>): Disposable {
// 2, 5
var disposable = this._subscribe(observer);
return disposable;
}
map(fn: (t: T) => T): Observable<T> {
var source = this;
return new Observable<T>((child) => {
// 3.
var inner = new InnerObserver<T>(function onNext(t: T) {
// 8
try {
var result = fn(t);
child.onNext(result);
}
catch (e) {
child.onError(e);
}
},
function onError(e) {
child.onError(e);
},
function onComplete() {
child.onComplete();
});
// 4.
var parentDisposable = source.subscribe(inner);
var disposable = new Disposable(()=> {
inner.dispose();
parentDisposable.dispose();
console.log('map is disposed!');
})
return disposable;
});
}
}
interface IObserver<T> {
onNext(t: T);
onError(e: Error);
onComplete();
}
interface IDisposable {
dispose();
}
class Observer<T> implements IObserver<T> {
private _onNext: (t: T) => void;
private _onError: (e: Error) => void;
private _onComplete: () => void;
constructor(onNext: (t: T) => void,
onError?: (e: Error) => void,
onComplete?: () => void) {
this._onNext = onNext;
this._onError = (typeof onError === 'function') ?
onError : ((e: Error): void => {
console.log('error!');
});
this._onComplete = (typeof onError === 'function') ?
onComplete : ((): void => {
console.log('complete');
});
}
onNext(t: T) {
this._onNext(t);
}
onError(e: Error) {
this._onError(e);
}
onComplete() {
this._onComplete();
}
}
class InnerObserver<T> implements IObserver<T>, IDisposable {
private _onNext: (t: T) => void;
private _onError: (e: Error) => void;
private _onComplete: () => void;
private isDisposed: boolean;
constructor(onNext: (t: T) => void,
onError: (e: Error) => void,
onComplete: () => void) {
this._onNext = onNext;
this._onError = onError
this._onComplete = onComplete;
this.isDisposed = false;
}
onNext(t: T) {
if (!this.isDisposed) {
this._onNext(t);
}
}
onError(e: Error) {
if (!this.isDisposed) {
this._onError(e);
}
}
onComplete() {
if (!this.isDisposed) {
this.isDisposed = true;
this._onComplete();
}
}
dispose() {
this.isDisposed = true;
this._onNext = null;
this._onError = null;
this._onComplete = null;
}
}
class Disposable implements IDisposable {
private action: () => void;
private isDisposed: boolean;
constructor(action: () => void) {
this.action = action;
}
dispose(): void {
if (!this.isDisposed) {
this.isDisposed = true;
var action = this.action;
this.action = null;
action();
}
}
}
var source = (new Observable<number>((o) => {
var inner = new InnerObserver<number>((t) => {
o.onNext(t);
},
(e) => {
o.onError(e);
},
() => {
o.onComplete();
});
window.setTimeout(() => {
for (var i: number = 0; i <= 3; ++i) {
inner.onNext(i);
}
inner.onComplete();
}, 200);
return new Disposable(()=>{
inner.dispose();
console.log('source is disposed!');
});
}));
var mapped = source.map((i) => {
return i * i;
})
var subscribed = mapped.subscribe(new Observer((i) => { // 1.
// 9.
console.log(i);
}));
var shouldExecute = true;
if (shouldExecute) {
// 0
// 1
// 2
// 9
}
else {
subscribed.dispose(); //disposed!
}
前章でColdなObservableの実現方法について解読したが、Hot Observableはどのように実現しているのか。
これは極めて単純でSubjectが中継地点になっているだけ。Cold Observableの仕組みがわかれば、なんてことはない。
Rx.Observable.multicastはConnectableObservableを作って返すのだが、これは要はSubjectのラッパーで、ConnectableObservable
がsubscribeされると Rx.Observable.multicastの引数としたSubjectのsubscribe()
メソッドを呼び出しているだけに過ぎない.
Subjectは、subscribeされても自身の祖先方向への再帰的なsubscribeを行わず、自身の保有する配列に新たなObserverを追加して、新たにInnerSubscription
を返す。
Subject.onNext()
が呼び出されると、自身が保有する全てのObserverのonNext()
を呼び出す。同様に自身を購読するObserverの子孫がdispose()
を呼んだ場合、保有する配列から対象のObserverのチェーンを削除するだけにとどまる。
Subjectに祖先方法へのsubscribeをやらせるには、該当するSubjectをラップしているConnectableObservable
のconnect()
を呼び出してやれば良い。Subject -> sourceが開通し CompositeDisposable
が返ってくる。この辺りは各種チュートリアルでも触れられている通りだよね。
これで、Hot Observableの挙動は実現される。簡単ですね。 まとめると、こうなる:
- Observer(n) を引数として、ConnectableObservable(n-1) をsubscribeする
- 自身のラップする_Subject(n-1)_を、Observer(n) を引数としてsubscribeする
- _Subject(n-1)_は、自身の持つObserverの配列に Observer(n) を加えて、InnerSubscription(n)(m) を生成して返す
- m は、同一のObservableへのsubscribeが何度目かを表す
- ConnectableObservable(n).connect() を呼び出す
- _Subject(n)_を引数として、sourceである_Observable(n-1)_をsubscribeする
- ConnectableObservable(n).connect() の戻り値として, 新しい CompositeDisposable(n) を返す.
- この_CompositeDisposable(n)_は、step 2でsubscribeした結果である_Disposable(n-1)_を保持しているため、この_CompositeDisposable(n)_をdisposeすることで、_Subject(n)_のsource群をdisposeしていく
- _CompositeDisposable(n)_は、同時に一つしか生成されない(disposeされると、新しいインスタンスが生成される)
Schedulerの実装について、これも結構コードがややこしいが、Observer/Observableほどではないので省略。
- Cold Observableは、Observerのチェーンが毎回生成され、コールバックが毎回実行されるために、値が毎回新規作成されるに過ぎない
- Hot Observableは、チェーンの途中までを他のチェーンと共有しているに過ぎない