Skip to content

Instantly share code, notes, and snippets.

@tetsuharuohzeki
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tetsuharuohzeki/b509daf046cfe00cca72 to your computer and use it in GitHub Desktop.
Save tetsuharuohzeki/b509daf046cfe00cca72 to your computer and use it in GitHub Desktop.
「RxJSのObservableのHotとColdの実装を眺めた」. version controlled souce for http://saneyukis.hatenablog.com/entry/2015/03/28/142422

RxJSのObservableのHotとColdの実装を眺めた

RxJSの実装が長らく気になっていた。動機としては、来月の歌舞伎座.tech#7「Reactive Extensions」の予習の意味合いもあるが、個人的な興味が一番。コードベースはRxJS 2.4.6@089dca5。ロジックとしては結構ややこしいので、この記事も間違いがあるかもしれない点に注意。

コード雑感

本当にただの感想。

だいたい2010~12年頃のJSコードだし、TypeScriptでもないし、インターフェースの名前も独特ではあるけれど、Reactive ExtensionsがそもそもC#由来であるという前提を踏まえると、まあ、こんなもんかといった感じ。連結ベースのビルドシステムにせよ、よく回っているとは思う。

独自のユーティリティ関数などが多いし、TypeScriptでES3ターゲットで吐いた方がコードとしての統一感は出るだろうが、ファイルベース + 連結して初めて全容が完成するビルドシステムを活かした複数のバイナリエディションの提供があるため、TypeScriptでやってもそこまで綺麗にはならないと思う。そもそも Observable<T> のメソッド数が非常に多いのもあるし。どうしても綺麗にしようとすると、C的なifdefプリプロセッサのような原始的なアプローチも必要になるのではないかと感じる。

そういうコードの癖を除けば、非常によくできているコードだと思う。

Observable と Observer による値の伝播

おそらくここが最難関だと感じる。プライベート関数を良く使うこともありJSDoc的な型アノテーションもそんなに多く無いので、コードを行ったり来たりすることになる。

具体例がないとイメージしにくいので、こういうコードを例に話を進めることにする。

let source = Rx.Observable.range(0, 3);
let mapped =  source.map(function mapFn(x)=> {
    return x * x;
});

let observer = Rx.Observer.create(function onNext(x) {
    console.log('Next: ' + x);
});
let subscribed = mapped.subscribe(observer);

ここで observeronNextのコールバックしか与えていないのは、問題を単純化するため. 値をsourceから伝播させる方法さえわかれば、まあ細かい注意はあるにせよ、だいたい似たようなアプローチでonErroronCompleteも実装できるので省略する.

これを考えるにあたって、まず、Observable の生成を順に見ていく.

まずは Rx.Observable.rangeRx.Observable.mapだが、この2つがやっていることに大差はない。この2つがやっているのは、(戻り値となる)対応するObservableの生成と、それぞれのメソッドに応じた処理を記述しているだけ。ここで__重要なのは、このObservableがどのように後続に渡されるか、および、自身がどのようにsubscribeされるか・どのように値を受け取るか__になる。これらのObservableの基底オブジェクトである、ObservableBaseを見てみる。

ちなみに、RxJSのリポジトリ内には,

の二カ所にoperatorの定義があり、それぞれでmaprangeの実装が存在している。が、前者のperfはパフォーマンスを改善したもので、2.4系から有効になったもので、改善指針がリリースノートに書いてある。このリリースノートを、もっと早く見つけていれば色々類推を重ねる必要もなかったのに…… というわけで、今回はperfの方の実装を参照する。

MapObservableは、MapObservable-> ObservableBase -> Observableという継承関係を持っているが、責務としては (operator特有の処理) -> Observableのチェーンを繋ぐのに必要な処理 -> 最もプリミティブなObservableとしての処理 と分かれている。最も基底層に位置するObservableは本当に最低限のpublic APIとそれに関連する処理を提供するだけにとどまっているため、実質的な処理は一段上で定義されているケースが多いようである。

このObservableBaseはコンストラクタとして呼び出されると、subscribeを引数として自身の基底オブジェクトObservableを呼び出す。Observableは引数に渡された関数をthis._subscribeに保持するだけだが、この this._subscribe は、自身がsubscribeされた際に呼び出され、実行結果を返すという点を覚えておきたい。

Observerによるチェーンの構築

ここで、引数として渡されたsubscribeを見てみる。今回読んでいるリビジョンでは以下の通り。

    function fixSubscriber(subscriber) {
      return subscriber && isFunction(subscriber.dispose) ? subscriber :
        isFunction(subscriber) ? disposableCreate(subscriber) : disposableEmpty;
    }

    function setDisposable(s, state) {
      var ado = state[0], self = state[1];
      var sub = tryCatch(self.subscribeCore).call(self, ado);

      if (sub === errorObj) {
        if(!ado.fail(errorObj.e)) { return thrower(errorObj.e); }
      }
      ado.setDisposable(fixSubscriber(sub));
    }

    function subscribe(observer) {
      var ado = new AutoDetachObserver(observer), state = [ado, this];

      if (currentThreadScheduler.scheduleRequired()) {
        currentThreadScheduler.scheduleWithState(state, setDisposable);
      } else {
        setDisposable(null, state);
      }
      return ado;
    }

    function ObservableBase() {
      __super__.call(this, subscribe);
    }

    ObservableBase.prototype.subscribeCore = notImplemented;

ここからがObservableとObserver(そしてDisposable)の処理を実現する根幹となる。実際、かなりややこしく、デバッガで追っても理解するのには一苦労する。

  1. まず、mapped.subscribe()が呼び出されると、Obserbale._subscribeに登録したコールバック関数が、引数をobserverとして呼び出される。上に引用したsubscribeがそれだ。
  2. 呼び出されたobserverを元に、AutoDetachObserverのインスタンスであるadoが生成される。AutoDetachObseverは自身のメンバとして渡されたobserverと、SingleAssignmentDisposableを保持している。
  3. adoおよびthisコンテクスト( MapObservable )、コールバックとしてsetDisposableの3つがSchedulerに投げ込まれる。
    1. Schedulerに投げ込まれたのち、setDisposableが実行される。第一引数のsはSchedulerなので無視して、第二引数のstateだけ見ればよい
      1. ここで、self.subscribeCoreすなわちMapObservable.prototype.subscribeCoreadoを引数として実行される。このadosubscribe()の引数として渡されたものである点に注意.
      2. subscribeCore()は、adoを元にMapObserverを生成する. これは、__自身の一つ上流となるObservable(ここではsource)の値を受け取り、そこにmap(fn)で渡されたfnを適用した上で、後続(abo)に渡すObserver__である
      3. 生成されたMapObserverを引数として、source.subscribe()が呼び出され、結果 A を返す。つまりsource.subscribe()にとってのaboが _A_である
    2. 返ってきた A を引数として、ado.setDisposableが呼び出される. これは、SingleAssignmentDisposable.setDisposable()を呼び出しているだけなので、そのまま ASingleAssignmentDisposable.currentへと収まる
      • ここで、既にcurrentに値が設定されている場合、それのdispose()が走るのだが、今回は特にそんなことはないので無視する
  4. adoが戻り値として返る、すなわちadosubscribe()の戻り値として返る。つまり、これがサンプルコード中のsubscribedの値となる。

以上の処理により、以下の関係が構築できた.

  • abo === サンプルコード中のsubscribed
  • source <- mapMapObserver) <- aboAutoDetachObserver ) <- observerといった方向でsubscribeの関係がある
    • sourceの値をobserverまで伝播できることがわかる
    • AutoDetachObserverobserverを包んだDisposableなので、実質的には無いものと思って良い
  • sourceの戻り値であるDisposableabo.m.currentSingleAssignmentDisposable)に参照されている
    • これにより、abo.dispose()することで、abo.m.current.dispose()を呼び出せる

今回はこれだけでrootまで到達したが、仮に間に挟まるoperatorが長大な物となったとしても、同様の処理をrootとなるObservableまで再帰的に繰り返すことで、以下のことが実現できることがわかる

  • Cold Observable
    • subscribeされるまで実行されない遅延挙動の実現
    • subscribeに応じて常に新規に値を返すObservableの実現
  • Disposable.dispose()を呼び出す事で、一連のObservableの連鎖全体をdisposeする

余談ではあるが、 MapObservableFilterObservableではinternalMapないしinternalFilterというメソッドを所持しており、以下のように同じoperatorが連続して続くケースでは、自身ではなく自身のsourceをsourceとしてObservableを生成することでsubscribeのコストを下げている。

var mapped = source.map((i) => {
    return i^2;
}).map((i) => {
    return String(i);
});

var filter = source.filter((x) => {
    return (x % 2) === 0;
}).filter((x) => {
    return x < 10;
});

ややこしいので整理

以上の流れを、整理・一般化すると以下のようなフローになる(はず。ややこしいのでミスあるかも)

表現上の前提

  • 現在のoperatorの位置を_n_とした場合、自身のsourceとなるObservableに属する物の位置を_n-1_、自身をsubscribeするObserverの位置を_n+1_として表す
  • Observable.subscribe()の引数を input、返り値を output とする
  • map()などで渡されたmap関数の実行に必要となるObserverを便宜上, _InnerObserver_と呼称する
    • ECMA262 PromiseにおけるDeferredに類似した概念

operatorを適用してObservableを生成する

  1. subscribe時の挙動を定めた_onSubscribeCallback(n)_を定義する
  2. _onSubscribeCallback(n)_を引数として、_Observable(n)_を生成する
  3. sourceとなる_Observable(n-1)_への参照を_Observable(n)_に保持させる
  4. _Observable(n)_をoperatorメソッドを呼び出した結果として返す

subscribeによるチェーンを作る

  1. Observable(n-1)Observable(n)、_Observer(n+1)_が存在する物とする
  2. _Observer(n+1)を_input_として_Observable(n).subscribe() を呼び出す
    1. _onSubscribeCallback(n)_が実行される
      1. _InnerObserver(n)_が生成される
      2. _InnerObserver(n)_に_Observer(n+1)_を保持させる
      3. _Disposable(n)_が生成される
      4. _Disposable(n)_に_InnerObserver(n)_を保持させる
      5. _Disposable(n)_を_onSubscribeCallback(n)_の結果として返す
    2. _onSubscribeCallback(n)_の結果を、Observable(n).subscribe()の output とする
    3. _InnerObserver(n)_を引数として_Observable(n-1)_をsubscribeした結果を_Disposable(n-1)_とする
    4. _Disposable(n)_に_Disposable(n-1)_を参照させる
    5. output を返す
  3. 2の処理をrootに到達するまで再帰的に実行する

値の伝播

以下の順番で値が伝播することで、root由来の値がleafまで到達できる

  1. InnerObserver(n-1).onNext()
  2. InnerObserver(n).onNext()
  3. Observer(n+1).onNext()

dispose

以下の順序でdispose処理が連鎖することで、leaf->rootまでdisposeを伝播させることができる。InnerObserverもdisposeするのは、中継処理を抑えるため。

  1. Disposable(n).dispose()
    • ここで同時に_InnerObserver(n)_.dispose()
  2. Disposable(n-1).dispose()
    • ここで同時に_InnerObserver(n-1)_.dispose()

簡易的な実装を作る

TypeScriptで簡易実装を作ると(たぶん)こんな感じになる。schedulerに関する実装は本質ではないので除外している。なお、コード中のコメントは、subscribe時のコードの実行順序をメモ書き程度につけている。

class Observable<T> {
    private _subscribe: (o: IObserver<T>) => Disposable;

    constructor(fn: (o: IObserver<T>) => Disposable) {
        this._subscribe = fn;
    }

    subscribe(observer: IObserver<T>): Disposable {
        // 2, 5
        var disposable = this._subscribe(observer);
        return disposable;
    }

    map(fn: (t: T) => T): Observable<T> {
        var source = this;
        return new Observable<T>((child) => {
            // 3.
            var inner = new InnerObserver<T>(function onNext(t: T) {
                // 8
                try {
                    var result = fn(t);
                    child.onNext(result);
                }
                catch (e) {
                    child.onError(e);
                }
            },
            function onError(e) {
                child.onError(e);
            },
            function onComplete() {
                child.onComplete();
            });
            // 4.
            var parentDisposable = source.subscribe(inner);
            var disposable = new Disposable(()=> {
                inner.dispose();
                parentDisposable.dispose();
                console.log('map is disposed!');
            })
            return disposable;
        });
    }
}

interface IObserver<T> {
    onNext(t: T);
    onError(e: Error);
    onComplete();
}

interface IDisposable {
    dispose();
}

class Observer<T> implements IObserver<T> {
    private _onNext: (t: T) => void;
    private _onError: (e: Error) => void;
    private _onComplete: () => void;

    constructor(onNext: (t: T) => void,
            onError?: (e: Error) => void,
            onComplete?: () => void) {
        this._onNext = onNext;
        this._onError = (typeof onError === 'function') ?
            onError : ((e: Error): void => {
                console.log('error!');
            });
        this._onComplete = (typeof onError === 'function') ?
            onComplete : ((): void => {
                console.log('complete');
            });
    }

    onNext(t: T) {
        this._onNext(t);
    }

    onError(e: Error) {
        this._onError(e);
    }

    onComplete() {
        this._onComplete();
    }
}

class InnerObserver<T> implements IObserver<T>, IDisposable {
    private _onNext: (t: T) => void;
    private _onError: (e: Error) => void;
    private _onComplete: () => void;
    private isDisposed: boolean;

    constructor(onNext: (t: T) => void,
            onError: (e: Error) => void,
            onComplete: () => void) {
        this._onNext = onNext;
        this._onError = onError
        this._onComplete = onComplete;
        this.isDisposed = false;
    }

    onNext(t: T) {
        if (!this.isDisposed) {
            this._onNext(t);
        }
    }

    onError(e: Error) {
        if (!this.isDisposed) {
            this._onError(e);
        }
    }

    onComplete() {
        if (!this.isDisposed) {
            this.isDisposed = true;
            this._onComplete();
        }
    }

    dispose() {
        this.isDisposed = true;
        this._onNext = null;
        this._onError = null;
        this._onComplete = null;
    }
}

class Disposable implements IDisposable {
    private action: () => void;
    private isDisposed: boolean;

    constructor(action: () => void) {
        this.action = action;
    }

    dispose(): void {
        if (!this.isDisposed) {
            this.isDisposed = true;
            var action = this.action;
            this.action = null;
            action();
        }
    }
}

var source = (new Observable<number>((o) => {
    var inner = new InnerObserver<number>((t) => {
        o.onNext(t);
    },
    (e) => {
        o.onError(e);
    },
    () => {
        o.onComplete();
    });

    window.setTimeout(() => {
        for (var i: number = 0; i <= 3; ++i) {
            inner.onNext(i);
        }
        inner.onComplete();
    }, 200);

    return new Disposable(()=>{
        inner.dispose();
        console.log('source is disposed!');
    });
}));

var mapped = source.map((i) => {
    return i * i;
})

var subscribed = mapped.subscribe(new Observer((i) => { // 1.
    // 9.
    console.log(i);
}));


var shouldExecute = true;
if (shouldExecute) {
    // 0
    // 1
    // 2
    // 9
}
else {
    subscribed.dispose(); //disposed!
}

Subject

前章でColdなObservableの実現方法について解読したが、Hot Observableはどのように実現しているのか。 これは極めて単純でSubjectが中継地点になっているだけ。Cold Observableの仕組みがわかれば、なんてことはない。 Rx.Observable.multicastConnectableObservableを作って返すのだが、これは要はSubjectのラッパーで、ConnectableObservableがsubscribeされると Rx.Observable.multicastの引数としたSubjectのsubscribe()メソッドを呼び出しているだけに過ぎない.

Subjectは、subscribeされても自身の祖先方向への再帰的なsubscribeを行わず、自身の保有する配列に新たなObserverを追加して、新たにInnerSubscriptionを返す。

Subject.onNext()が呼び出されると、自身が保有する全てのObserverのonNext()を呼び出す。同様に自身を購読するObserverの子孫がdispose()を呼んだ場合、保有する配列から対象のObserverのチェーンを削除するだけにとどまる。

Subjectに祖先方法へのsubscribeをやらせるには、該当するSubjectをラップしているConnectableObservableconnect()を呼び出してやれば良い。Subject -> sourceが開通し CompositeDisposableが返ってくる。この辺りは各種チュートリアルでも触れられている通りだよね。

これで、Hot Observableの挙動は実現される。簡単ですね。 まとめると、こうなる:

ConnectableObservableのsubscribe時

  1. Observer(n) を引数として、ConnectableObservable(n-1) をsubscribeする
  2. 自身のラップする_Subject(n-1)_を、Observer(n) を引数としてsubscribeする
  3. _Subject(n-1)_は、自身の持つObserverの配列に Observer(n) を加えて、InnerSubscription(n)(m) を生成して返す
    • m は、同一のObservableへのsubscribeが何度目かを表す

Subjectの開通

  1. ConnectableObservable(n).connect() を呼び出す
  2. _Subject(n)_を引数として、sourceである_Observable(n-1)_をsubscribeする
  3. ConnectableObservable(n).connect() の戻り値として, 新しい CompositeDisposable(n) を返す.
    • この_CompositeDisposable(n)_は、step 2でsubscribeした結果である_Disposable(n-1)_を保持しているため、この_CompositeDisposable(n)_をdisposeすることで、_Subject(n)_のsource群をdisposeしていく
    • _CompositeDisposable(n)_は、同時に一つしか生成されない(disposeされると、新しいインスタンスが生成される)

Scheduler

Schedulerの実装について、これも結構コードがややこしいが、Observer/Observableほどではないので省略。

まとめ

  • Cold Observableは、Observerのチェーンが毎回生成され、コールバックが毎回実行されるために、値が毎回新規作成されるに過ぎない
  • Hot Observableは、チェーンの途中までを他のチェーンと共有しているに過ぎない

参考

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment