by Grzegorz Dyrda
W RxJava wszystkie strumienie są domyślnie "zimne" - czyli każda nowa subskrypcja powoduje skutki uboczne w źródle. Niestety, takie podejście sprawdza się jedynie w prostych "książkowych" przypadkach. Na co dzień, kiedy tworzymy logikę biznesową za pomocą łączenia/kombinacji strumieni, właściwość ta jest strasznie upierdliwa.
Przykład:
val request = Observable.fromCallable { /* wyślij request sieciowy i zwróć odpowiedź */ }
val task1 = request
.map { /* zrób coś z odpowiedzią */ }
.subscribe()
val task2 = request
.map { /* zrób coś innego z odpowiedzią */ }
.subscribe()
Co się stanie? Otóż request wyśle się DWA razy, a nie raz, jak byśmy się tego spodziewali.
To oczywiście tylko przykład pierwszy w brzegu. W prawdziwym projekcie, gdy kombinujemy strumienie na różne sposoby, takich przypadków o wiele, wiele wiecej (piszę to z własnego doświadczenia).
Aby temu zaradzić, musimy zamieniać strumienie na ciepłe/shared/multicast (to wszystko synonimy).
Poniżej są trzy przykłady:
- pierwsze dwa są sobie równoważne
- trzeci jest dodatkowo "lazy" - wykonanie lambdy jest odroczone do momentu pierwszego użycia
val obs1 = Observable.just(1).replay(1).refCount()
val obs2 = Observable.just(1).shareReplay()
val obs3 by multicast {
Observable.just(1)
}
W RxJava zwykły .map().reduce() wykonuje się jednowątkowo - nie ma żadnej współbieżności.
Poniższy map() wykona się dla każdego Usera na JEDNYM i tym samym wątku:
Observable.create { /* zwraca WIELE wyników - np. typu User */ }
.map { /* jakaś długo trwająca operacja, zwracająca Int */ }
.reduce { /* sumujemy Inty */ }
Ale przecież po to jest właśnie map-reduce, żeby map() się zrównoleglił, prawda? W RxJava robimy to za pomocą .flatMap { ...subscribeOn(...) } Ale to rozwiązanie jest po prostu nieczytelnie i nieintuicyjne. Zamiast tego możemy skorzystać z proste rozszerzenia .mapParallel().
W poniższym przykładzie kolejne wywołania mapParallel() będą zrównoleglone w puli wątków równej ilości rdzeni procesora:
Observable.create { /* zwraca WIELE wyników - np. typu User */ }
.mapParallel(computation()) { /* jakaś długo trwająca operacja, zwracająca Int */ }
.reduce { /* sumujemy Inty */ }