Skip to content

Instantly share code, notes, and snippets.

@GrzegorzDyrda
Last active June 30, 2017 06:24
Show Gist options
  • Save GrzegorzDyrda/6ed3fe86edb7ea0c1159f29c287ae2ea to your computer and use it in GitHub Desktop.
Save GrzegorzDyrda/6ed3fe86edb7ea0c1159f29c287ae2ea to your computer and use it in GitHub Desktop.
Pragmatic Observable extensions.

Observable Extensions - przydatne rozszerzenia do pracy z RxJava

by Grzegorz Dyrda


1. Współdzielenie strumieni

W RxJava wszystkie strumienie są domyślnie "zimne" - czyli każda nowa subskrypcja powoduje skutki uboczne w źródle. Niestety, takie podejście sprawdza się jedynie w prostych "książkowych" przypadkach. Na co dzień, kiedy tworzymy logikę biznesową za pomocą łączenia/kombinacji strumieni, właściwość ta jest strasznie upierdliwa.

Przykład:

val request = Observable.fromCallable { /* wyślij request sieciowy i zwróć odpowiedź */ }

val task1 = request
  .map { /* zrób coś z odpowiedzią */ }
  .subscribe()

val task2 = request
  .map { /* zrób coś innego z odpowiedzią */ }
  .subscribe()

Co się stanie? Otóż request wyśle się DWA razy, a nie raz, jak byśmy się tego spodziewali.

To oczywiście tylko przykład pierwszy w brzegu. W prawdziwym projekcie, gdy kombinujemy strumienie na różne sposoby, takich przypadków o wiele, wiele wiecej (piszę to z własnego doświadczenia).

Aby temu zaradzić, musimy zamieniać strumienie na ciepłe/shared/multicast (to wszystko synonimy).

Poniżej są trzy przykłady:

  • pierwsze dwa są sobie równoważne
  • trzeci jest dodatkowo "lazy" - wykonanie lambdy jest odroczone do momentu pierwszego użycia
val obs1 = Observable.just(1).replay(1).refCount()

val obs2 = Observable.just(1).shareReplay()

val obs3 by multicast {
  Observable.just(1)
}

2. Map-Reduce

W RxJava zwykły .map().reduce() wykonuje się jednowątkowo - nie ma żadnej współbieżności.

Poniższy map() wykona się dla każdego Usera na JEDNYM i tym samym wątku:

Observable.create { /* zwraca WIELE wyników - np. typu User  */ }
  .map { /* jakaś długo trwająca operacja, zwracająca Int */ }
  .reduce { /* sumujemy Inty */ }

Ale przecież po to jest właśnie map-reduce, żeby map() się zrównoleglił, prawda? W RxJava robimy to za pomocą .flatMap { ...subscribeOn(...) } Ale to rozwiązanie jest po prostu nieczytelnie i nieintuicyjne. Zamiast tego możemy skorzystać z proste rozszerzenia .mapParallel().

W poniższym przykładzie kolejne wywołania mapParallel() będą zrównoleglone w puli wątków równej ilości rdzeni procesora:

Observable.create { /* zwraca WIELE wyników - np. typu User  */ }
  .mapParallel(computation()) { /* jakaś długo trwająca operacja, zwracająca Int */ }
  .reduce { /* sumujemy Inty */ }
import io.reactivex.android.schedulers.AndroidSchedulers
//
// Observable extensions.
//
fun <T> Observable<T>.doOnNextOnUI(onNext: (T) -> Unit): Observable<T> {
return observeOn(AndroidSchedulers.mainThread()).doOnNext(onNext)
}
/**
* This is an alias for [replay()][Observable.replay].[refCount()][ConnectableObservable.refCount].
*
* Source: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/sharereplay.md
*/
fun <T> Observable<T>.shareReplay(bufferSize: Int = 1): Observable<T> {
return replay(bufferSize).refCount()
}
/**
* Zrównoleglona mapa. Czyli [flatMap][Observable.flatMap] { [fromCallable][Observable.fromCallable] { tu nasza lambda }.subscribeOn(scheduler) }.
*/
inline fun <T, R> Observable<T>.mapParallel(scheduler: Scheduler, crossinline mapper: (T) -> R): Observable<R> {
return flatMap {
Observable.fromCallable { mapper(it) }
.subscribeOn(scheduler)
}
}
fun <T> Observable<T>.toInfinite(): Observable<T> {
return mergeWith(Observable.never())
}
/**
* Delegat.
* Zwraca leniwy współdzielony strumień. Czyli [lazy()][lazy]+[shareReplay()][shareReplay].
*/
fun <T> multicast(initializer: () -> Observable<T>) = lazy {
initializer().shareReplay()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment