RxJava 2์ ์ฐ์ฐ์ ๊ฒฐํฉ (Operator fusion in RxJava 2) https://proandroiddev.com/operator-fusion-in-rxjava-2-dcd6612cffae
์์ฑ: Vasya Drobushkov
RxJava๋ ๊ฐ๋ ฅํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ง๋ง ๋ช๊ฐ์ง ๋ฌธ์ ๋ ์์ต๋๋ค. ํนํ ์ฑ๋ฅ๊ณผ ๋ฉ๋ชจ๋ฆฌ ๋ฌธ์ ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ํ๋ ค๋ ๋ฌธ์ ์ ๊ธฐ์ ์ ์ธ ๊ด์ ์์ ์๋ฃจ์ ์ด ์ค๊ณ๋๋ ๋ฐฉ์์์ ๋น๋กฏ๋ฉ๋๋ค.
RxJava๋ ๊ฐ์ ๋น์ฉ(overhead)๋ฅผ ์ต์ํํ๊ธฐ ์ํด "์ฐ์ฐ์ ๊ฒฐํฉ" (Operator fusion)์ด๋ผ ๋ถ๋ฆฌ๋ ์ฌ๋ผ ๊ฐ์ง ์ต์ ํ๋ฅผ ๊ฐ์ง๋๋ค. ๊ทธ๋ฆฌ๊ณ ์ฐ๋ฆฌ๋ ์ด ๊ธ์์ ๊ฑฐ๊ธฐ์ ๋ํด ๋ค๋ฃฐ ์์ ์ ๋๋ค.
ํ์ง๋ง ๋จผ์ RxJava ๋ฐ์ํ ํ์ ์ด ์ด๋ป๊ฒ ๋์ํ๋์ง์ ๋ฌด์จ ๋ฌธ์ ๊ฐ ์๋์ง ์์ฝํ๊ฒ ์ต๋๋ค.
Observable๋ฅผ ์ฌ์ฉํ ๋ ํฌ๊ฒ ์ธ๊ฐ์ง ๋ถ๋ถ์ด ์์ต๋๋ค. Observable, Observer, Disposable.
์ฐ๋ฆฌ ๋ชจ๋ Observable์ ์๊ณ ์๊ณ ์ด๋ป๊ฒ ๋ง๋๋์ง ์๋๋ค. (์: Observable.just("Hello, World!")
) Observable์ ๊ฐ Rx ์ฒด์ธ์ ์์ฑ ๋ธ๋ก์
๋๋ค.Observable์ ์๋ํ๋ ค๋ฉด subscribe(...)
๋ฉ์๋์ Observer
๋ฅผ ์ ๋ฌํด ๊ตฌ๋
ํด์ผํฉ๋๋ค.
Observer๋ ๊ธฐ๋ณธ์ ์ผ๋ก onSubscribe
, onNext
, onComplete
์ฝ๋ฐฑ์ ๊ฐ์ง ์ธํฐํ์ด์ค์
๋๋ค.
Observer๊ฐ Observable์ ๊ตฌ๋
ํ๋ฉด (Observer๊ฐ ๋์ค์ ํ์ํ ๋ Rx ์ฒด์ธ์ ์์ ๊ธฐ(dispose) ์ํด) Observable์ Disposable ๊ฐ์ฒด๋ฅผ ์์ฑํ๊ณ onSubscribe
์ฝ๋ฐฑ์ ํตํด Observer๋ก ์ ๋ฌํฉ๋๋ค.
์ด ์์
์ด ์๋ฃ๋๋ฉด, ํต์ ์ด ์ฐ๊ฒฐ๋๊ณ Observable์ด ์ถ๊ฐ์ ์ธ ๋๊ธฐ์์ด onNext
๋ฅผ ํตํด ๊ฐ๋ค์ ๋ณด๋ผ ์ ์์ต๋๋ค.
๊ทธ๋์ Observable์ ๋ฐฐ์(backpressure)์ ์ง์ํ์ง ์์ต๋๋ค. Observer๊ฐ Observable์๊ฒ ๋ ์ด์ ๊ฐ์ ๋ณด๋ด์ง ๋ง๋ผ ์๋ฆด ๋ฐฉ๋ฒ์ด ์์ต๋๋ค.
Flowable์ ๋ชจ๋ ๊ฒ์ด ๋น์ทํฉ๋๋ค. ํ์ง๋ง Observer์ Disposable ๋์ Subscriber์ Subscription๋ฅผ ๊ฐ์ง๋๋ค.
Subscription์ ์ถ๊ฐ๋ก request(n)
๋ฉ์๋๊ฐ ์์ต๋๋ค. ์ด๋ฅผ ์ด์ฉํ์ฌ Subscriber๋ ๋ช
์์ ์ผ๋ก Flowable์๊ฒ ์์ฒญ๋ ์์ ์์ดํ
์ ๋ณด๋ด๊ฒ ์์ฒญํ ์ ์์ต๋๋ค.
๊ฐ์ ์์ฒญํ์ง ์์ผ๋ฉด Flowable์ ์ด๋ค ๊ฐ๋ ๋ด๋ณด๋ด์ง ์์ต๋๋ค. ์ด๊ฒ Flowable์ด ๋ฐฐ์์ ์ง์ํ๋ ์ด์ ์ ๋๋ค.
RxJava ๋ฐ์ํ ํ์ ์ ์ฌ์ฉํ ๋ ๋ ์ค์ํ ๋จ๊ณ์ธ ์กฐ๋ฆฝ(assembly)๊ณผ ๊ตฌ๋ (subscribe)์ด ์์ต๋๋ค.
์กฐ๋ฆฝํ ๋ Rx ์ฒด์ธ์ด ๋ง๋ค์ด์ง๊ณ ๊ตฌ๋ ํ ๋ ์ฐ๋ฆฌ๋ Rx ์ฒด์ธ์ ์์ํฉ๋๋ค.
์๋์ ์์ ๋ฅผ ๊ณ ๋ คํด๋ณด์ธ์.
Observable.just(1, 2, 3)
.map { it + 1 }
.filter { it < 3 }
.subscribe()
์ด ๊ฒฝ์ฐ ์์์ ์๋๋ก ์ด๋ํ๋ฉด ๋ค์๊ณผ ๊ฐ์ธ ์ผ์ด ์ผ์ด๋ฉ๋๋ค.
- ObservableJust ๊ฐ์ฒด๊ฐ ์์ฑ๋ฉ๋๋ค.
- ObservableMap ๊ฐ์ฒด๊ฐ ์์ฑ๋๊ณ ์ด์ ์ ์์ฑ๋ ObservableJust๋ฅผ ์ Observable์ ์ ๋ฌํฉ๋๋ค. (๊ทธ๋์ ์กฐํฉ์ด ๋ฉ๋๋ค.)
- ObservableFilter ๊ฐ์ฒด๊ฐ ์์ฑ๋๊ณ ์ด์ ์ ์์ฑ๋ (ObservableJust๊ฐ ๋ด์ฅ๋) ObservableMap๋ฅผ ์ Observable์ ์ ๋ฌํฉ๋๋ค.
- ObservableFilter๋ฅผ ๊ตฌ๋ ํ์ฌ ์ค์ ๊ตฌ๋ (subscription)์ ์๋ํฉ๋๋ค.
- ObservableFilter๋ ๋ด๋ถ์ ์์ฒด ์ต์ ๋ฒ๋ฅผ ๋ง๋ค์ด ObservableMap์ ๊ตฌ๋ ํฉ๋๋ค.
- ObservableMap์ ๋ด๋ถ์ ์์ฒด ์ต์ ๋ฒ๋ฅผ ๋ง๋ค์ด ObservableJust๋ฅผ ๊ตฌ๋ ํฉ๋๋ค.
- ObservableJust๋ onSubscribe ์ด๋ฒคํธ๋ฅผ ๋ค์ด์คํธ๋ฆผ์ผ๋ก ๋ณด๋ ๋๋ค. (๋ค๋ฅธ ์ต์ ๋ฒ๋ธ๋ค๋ ์ด ์ด๋ฒคํธ๋ฅผ ์ฒด์ธ์ ์ต์ ์ต์ ๋ฒ๋ธ๊น์ง ๋ค์ด์คํธ๋ฆผ์ผ๋ก ๋ณด๋ ๋๋ค.)
- ObservableJust๋ ๊ฐ์ ๋ฐํํ๊ณ
onNext
์ฝ๋ฐฑ์ ํตํด ๋ค์ด์คํธ๋ฆผ์ผ๋ก ์ ๋ฌํฉ๋๋ค.
์ด ์งง์ Rx ์ฒด์ธ์์ ๊ฝค ๋ง์ ์ผ์ด ์ผ์ด๋๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค.
์ด ์ฒด์ธ์ด Flowable ํ์
์ด์๋ค๋ฉด request(n)
์ ์ถ๊ฐ ํต์ ์ด ๋ฐ์ํด ๋ ๋ณต์กํด์ง๋๋ค.
์ฐ์ฐ์ ์์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํ ๋ด๋ถ ํ๊ฐ ์์ต๋๋ค. ํ์ ๋ํ ์ ๊ทผ์ ์ง๋ ฌํ๋์ด์ผ ํฉ๋๋ค. (์ด๋ ์ ์ ํ ๋๊ธฐํ๋ฅผ ํตํด์ ์ ๊ทผํด์ผํ๋ค๋ ๊ฒ์ ๋๋ค.)
RxJava2๋ (์ฑ๋ฅ์ ์ํด) ์์ ๊ฐ์ฒด(Atomic object, ์: AtomicInteger)์ compareAndSet ๋ฉ์๋๋ฅผ ์ด์ฉํ ๋ฌดํ ๋ฐ๋ณต์ ๊ธฐ๋ฐํ ๋ธ๋กํน ์๋ ๋๊ธฐํ๋ฅผ ๊ฐ์ง๊ณ ์์ต๋๋ค. ๋ผ์ด๋ธ๋ฆฌ๋ฆฌ์์ ์ผ๋ฐ์ ์ผ๋ก ๋ค์๊ณผ ๊ฐ์ ์ฝ๋๋ฅผ ๋ณผ ์ ์์ต๋๋ค.
for (; ; ) {
long r = state.get();
if ((r & COMPLETED_MASK) != 0L) {
return;
}
long u = r | COMPLETED_MASK;
// (active, r) -> (complee, r) ์ ํ
if (state.compareAndSet(r, u)) {
// ๋ง์ฝ ์์ฒญ๋ ์์ด 0์ด ์๋๋ฉด ํ๋ฅผ ๋น์๋๋ค.
if (r != 0L) {
postCompleteDrain(u, actual, queue, state, isCancelled);
}
return;
}
}
์ฒด์ธ์ ๊ฐ ์ฐ์ฐ์๊ฐ ๊ฐ์ ํ๋ฅผ ๊ฐ์ง๋ค๊ณ ํ๋ฉด ์์ ๊ฐ์ฒด๋ฅผ ๊ฐ์ง ์ฐ์ฐ์์ ํ๋ ๊ฐ์ ๋น์ฉ์ ๊ฐ์ง๋๋ค.
์์ ๋ชจ๋ ๊ฒ์ ๊ณ ๋ คํด RxJava๊ฐ ๊ฐ์ง ๋ฌธ์ ๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
- ์กฐ๋ฆฝ ์๊ฐ ๊ฐ์ ๋น์ฉ - Rx ์ฒด์ธ์ ์์ฑํ๋ฉด ๊ฝค ๋ง์ ๊ฐ์ฒด๋ฅผ ๋ง๋ค์ด์ผ ํ๊ณ ๋ฉ๋ชจ๋ฆฌ ๊ฐ์ ๋น์ฉ๋ฅผ ์ ๋ฐํฉ๋๋ค.
- ๊ตฌ๋ ์๊ฐ ๊ฐ์ ๋น์ฉ - Rx ์ฒด์ธ์ ๊ตฌ๋ ํ๊ณ ๋ง์ ํต์ ์ด ์ด๋ฃจ์ด์ง๋ฉด ์ฑ๋ฅ ์์ ๊ฐ์ ๋น์ฉ์ด ๋ฐ์ํฉ๋๋ค.
- ํ ๋น๊ณผ ์ง๋ ฌํ ๊ฐ์ ๋น์ฉ - ๋งค ์ฐ์ฐ์๋ง๋ค ํ์ ์์ ๊ฐ์ฒด๊ฐ์ ๋ด๋ถ ๊ตฌ์กฐ๋ฅผ ๋ง๋๋ ๊ฒ์ ๋ฉ๋ชจ๋ฆฌ์ ์ฑ๋ฅ์ ๊ฐ์ ๋น์ฉ์ ์ ๋ฐํฉ๋๋ค.
์ฑ๋ฅ๊ณผ ๋ฉ๋ชจ๋ฆฌ ๋ฌธ์ ๋ฅผ ํ๊ธฐ ์ํด ์ฐ์ฐ์ ๊ฒฐํฉ์ด ์์ต๋๋ค.
๋๊ฐ์ง ํํ์ ๊ฒฐํฉ์ด ์์ต๋๋ค.
- ๋งคํฌ๋ก ๊ฒฐํฉ (Macro fusion) - ๋ช ๊ฐ์ ์ฐ์ฐ์ ํ๋๋ก ํฉ๋ณํ์ฌ ์กฐํฉ๊ณผ ๊ตฌ๋ ๋์ ์์ฑ๋๋ ๊ฐ์ฒด์ ์๋ฅผ ์ต์ํ ํฉ๋๋ค.
- ๋ง์ดํฌ๋ก ๊ฒฐํฉ (Micro fusion) - ์ฐ์ฐ์ ์ฌ์ด์ ๋ถํ์ํ ๋๊ธฐํ์ธ (ํ์ ๊ฐ์) ๊ณต์ ๋ด๋ถ ๊ตฌ์กฐ๋ฅผ ์ ๊ฑฐํฉ๋๋ค.
์กฐ๋ฆฝ์์ ๋งคํฌ๋ก ๊ฒฐํฉ์ Observable์ ์์ ์กฐํฉ ๋ ์์ฑ๋ ๊ฐ์ฒด๋ฅผ ์ต์ํํ๋ ๋ฐ ์ง์คํฉ๋๋ค. "์กฐ๋ฆฝ"์ ์ด๋ฐ ๊ฒ์ ์ด์ผ๊ธฐ ํฉ๋๋ค.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
Observable๋ค์ ์ต์ ํํ๋ ๋จ์ํ ๋ฐฉ๋ฒ์ ํน๋ณํ ๊ฒฝ์ฐ์ ๋ํ ๊ฒ์ฌ๋ฅผ ์ถ๊ฐํ์ฌ, ์ผ๋ฐ์ ์ธ Observable ๋ณด๋ค ๋จ์ํ Observable์ ์์ฑํ๋ ๊ฒ์ ๋๋ค.
์๋ฅผ ๋ค์ด Observable.fromArray
๋ฅผ ์ดํด๋ด
์๋ค. ์ด๋ ํญ๋ชฉ์ ๊ฐ์๊ฐ 0์ด๋ 1์ด๋ฉด ๊ฐ๊ฐ Observable.empty
๋๋ Observable.just
๋ก ๋ค์ด๊ทธ๋ ์ด๋ํ ์ ์์ต๋๋ค.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
if (items.length == 0) {
return empty();
} else if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
๊ฒฐํฉ(fuseable) ํจํค์ง์ ์ฒซ "๊ณ ๊ธ" ์ต์ ํ๋ ScalarCallable ์ธํฐํ์ด์ค์ ๋๋ค.
public interface ScalarCallable<T> extends Callable<T> {
// ์์ธ๋ฅผ ๋ด์ง ์๋๋ก ์ค๋ฒ๋ผ์ด๋ํฉ๋๋ค.
@Override
T call();
}
๊ธฐ๋ณธ์ ์ผ๋ก ์๋ฐ์ Callable ์ธํฐํ์ด์ค์ ๊ธฐ๋ณธ์ ์ผ๋ก๋ ๋์ผํ๋ฐ ์์ธ๋ฅผ ๋ด์ง ์๋ ์ฐจ์ด๊ฐ ์์ต๋๋ค.
ScalarCallable๋ ๋ฐ์ํ ํ์ ์ ์กฐ๋ฆฝ ์๊ฐ ๋์ ์์ ํ๊ฒ ์ถ์ถํ ์ ์๋ ์์๊ฐ์ ๊ฐ์ง๊ณ ์๋ ์ธํฐํ์ด์ค์ ๋๋ค. ํนํ ์ด๋ฐ ๋ฐ์ํ ํ์ ์ ์ ํํ ๊ฐ ํ๋๋ฅผ ๊ฐ์ง๊ฑฐ๋ ์์ ํ ํญ๋ชฉ ์์ฒด๊ฐ ์์ต๋๋ค.
๊ทธ๋์ ์ฐ๋ฆฌ๊ฐ call
๋ฉ์๋๋ฅผ ํธ์ถํ ๋, ๋ฐํ ๊ฐ์ด null์ด์๋ค๋ฉด ๋ฐ์ํ ํ์
์ ์ด๋ค ๊ฐ๋ ๊ฐ์ง์ง ์์ผ๋ฉฐ, null์ด ์๋ ๊ฐ์ด ์๋ค๋ฉด ํ๋์ ๊ฐ๋ง ๊ฐ์ง๋๋ค.
์ค๋ช ๋ ์ ์ฝ์ ๊ธฐ๋ฐ์ผ๋ก Observable, Floawable, Maybe์ empty, just ์ฐ์ฐ์๋ง์ด ์ด ์ธํฐํ์ด์ค๋ก ํ์๋ ์ ์์ต๋๋ค.
xMap ์ฐ์ฐ์(flatMap, switchMap, contactMap)๋ก ์๋ฅผ ๋ค๊ฒ ์ต๋๋ค. ์์ค๊ฐ ์ด ์ธํฐํ์ด์ค๋ก ํ์๋ ์ ์๋ค๋ฉด ์ด ์ต์ ํ๋ฅผ ์ ์ฉํ ์ ์์ต๋๋ค.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
if (this instanceOf ScalarCallable) {
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatmap<T, R>(this, ...));
}
์ด ๊ฒฝ์ฐ์ ์์ค๊ฐ ScalarCallable๋ก ํ์๋์๋ค๋ฉด (๊ฝค ๋ฌด๊ฑฐ์ด) ์ ์ฒด ๊ตฌํ ๋์ ์ xMap์ ๋จ์ํ๋ ๋ฒ์ ์ผ๋ก ๋ณํํ ์ ์์ต๋๋ค.
fuseable
ํจํค์ง์ ์ธ๊ฐ์ง ์ธํฐํ์ด์ค๊ฐ ์์ต๋๋ค.
public interface FuseToObservable<T> {
Observable<T> fuseToObservable();
}
public interface FuseToFlowable<T> {
Floawable<T> fuseToFlowable();
}
public interface FuseToMaybe<T> {
Maybe<T> fuseToMaybe();
}
FuseToObservable
์ ์ดํด๋ด
์๋ค. ๋ค๋ฅธ ์ธํฐํ์ด์ค๋ค๋ ์ ๋ถ ๋น์ทํฉ๋๋ค.
๋ค์ Rx ์ฒด์ธ์ ๊ณ ๋ คํด๋ด ์๋ค.
Observable.range(1, 10)
.count()
.toObservable()
.subscribe()
์ฌ๊ธฐ์ range๋ฅผ ๋ง๋ค๊ณ ๋ด๋ณด๋ธ ํญ๋ชฉ์ ๊ฐฏ์๋ฅผ ์ธ์ด๋ด
์๋ค. count
์ฐ์ฐ์๋ Single
์ ๋ฐํํฉ๋๋ค. ํ์ง๋ง ์ฐ๋ฆฌ๋ Observble์ ๋ฐ๊ธธ ์ํ๊ณ ์. (๋ค๋ฅธ Observable๊ณผ ์ด ๊ฒฐ๊ณผ๋ฅผ ๋จธ์งํด๋ ์๋ก ๋ค ์ ์์ต๋๋ค.) Rx ์ฒด์ธ์ ์ถ๊ฐ์ ์ธ toObservable
์ฐ์ฐ์๋ฅผ ์ถ๊ฐํด ๋ ๋ณด์กํ๊ณ ๋ฌด๊ฑฐ์์ก์ต๋๋ค.
FuseToObservable์ ์ฌ๊ธฐ์ ๋์์ด ๋ฉ๋๋ค. ์ด ์ธํฐํ์ด์ค๊ฐ ํ๋ ์ผ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค. Observable์ด ์๋ ๋ฐ์ํ ํ์
์ ๋ฐํํ๋ ๋ช๋ช ์ฐ์ฐ์๋ค์ ๋ด๋ถ์ Observable์ ๋ฐํํ๋ ๊ตฌํ์ ๊ฐ์ง๊ณ ์๊ณ ์ด ๊ตฌํ์ toObservable
ํธ์ถ์์ ์ถ์ถํ ์ ์๋ค๋ ๊ฒ์
๋๋ค.
count
์์ ๋ฅผ ์ดํด๋ด
์๋ค.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public interface Single<Long> count() {
return RxJavaPlugins.onAssembly(new ObservableCountSingle<T>(this));
}
๊ธฐ๋ณธ์ ์ผ๋ก ObservableCountSingle
์ ๋ฐํํฉ๋๋ค. ํ์ง๋ง ์ด ์ฐ์ฐ์๋ฅผ ์กฐ๊ธ ๋ ๊น๊ฒ ์ดํด๋ณด๋ฉด ์ด ๊ตฌํ์ด FuseToObservable
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๊ณ ๊ฒฐํฉ ๋ชจ๋๋ก ๋ถ๋ฆด ๋ ๋ค๋ฅธ ๊ตฌํ์ ์ ๊ณตํ๋ ๊ฒ์ ์ ์ ์์ต๋๋ค.
public interface class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
...
@Override
public Observable<Long> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
}
}
๊ทธ๋ฆฌ๊ณ toObservable
์ ํธ์ถํ๋ฉด ๊ทธ ๊ตฌํ์ด ์ถ์ถ๋๊ณ toObservable
์ ์ํ Observable์ด ์์ฑ๋์ง ์์ ํจ์จ์ ์
๋๋ค.
public final Observable<T> toObservable() {
if (this instanceOf FuseToObservable) {
return ((FuseToObservable<T>)this).fuseToObservable();
}
return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}
๊ตฌ๋
์ ๋งคํฌ๋ก ๊ฒฐํฉ์ ์กฐ๋ฆฝ์ ์ด๋ฃจ์ด์ง๋ ๊ฒ๊ณผ ๋์ผํ ์ต์ ํ์ ์ค์ ์ ๋์ง๋ง subscribeActual
๋ฉ์๋ ๋ด์์ ์ํ๋ฉ๋๋ค.
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
๊ตฌ๋ ์ ์ ๋ฐ์ดํฐ๊ฐ ๋ถํ์คํ๊ธฐ ๋๋ฌธ์ ์กฐ๋ฆฝ ์์ ์ ์ต์ ํ๊ฐ ๋ถ๊ฐ๋ฅํ ์ ์์ต๋๋ค. ์กฐ๋ฆฝ ๋ณด๋ค ๊ตฌ๋ ๋ ์ต์ ํ๋ฅผ ํ๋ ๊ฒ์ด ๋ ํธ๋ฆฌํ ์ ์์ต๋๋ค.
์กฐ๋ฆฝ๋์ ๋ง์ฐฌ๊ฐ์ง๋ก ์ด๋ค ํน์ํ ์กฐ๊ฑด์ ํ์ธํ์ฌ ์ผ๋ฐ์ ์ด์ง ์๋ ๋จ์ํ ๊ตฌํ์ ์ฌ์ฉํ๋ ๊ฐ๋จํ ์ต์ ํ๊ฐ ์์ต๋๋ค. ์๋ฅผ๋ค์ด Observable.amb๋ ์ ๊ณต๋ ์์ค์ ๊ฐฏ์๋ฅผ ํ์ธํ์ฌ ๋ฌด๊ฑฐ์ด AmbCoordinator๋ฅผ ์ธ์คํด์คํํ ์ง ๋ง์ง ๊ฒฐ์ ํฉ๋๋ค.
public void subscribeActual(Observer<? super T> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
...
if (count == 0) {
EmptyDisposable.complete(observer);
return;
} else if (count == 1) {
sources[0].subscribe(observer);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
ac.subscribe(sources);
}
์กฐ๋ฆฝ๋์ ScalarCallable ์ธํฐํ์ด์ค๋ก ๋ช๊ฐ์ง ์ต์ ํ๋ฅผ ํฉ๋๋ค. ๊ตฌ๋ ํ ๋๋ Callable ์ธํฐํ์ด์ค๋ฅผ ์ด์ฉํ ๋น์ทํ ์ต์ ํ๊ฐ ์์ต๋๋ค.
์ฃผ์: ScalarCallable์ด Callable์ ํ์ฅํ ๊ฒ ์ฒ๋ผ - ScalarCallable๋ก ์กฐ๋ฆฝ ๋ ์ ์ฉํ ์ต์ ํ๋ค์ ๊ตฌ๋ ๋ Callable์ ์ ์ฉํ ์ ์์ต๋๋ค.
Callable ์ธํฐํ์ด์ค๋ก ํ์๋ Observable์ ๋ํ ๊ตฌ๋ ํ ๋ xMap ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํ ๋๋ ๋จ์ํ๋ ๊ตฌํ์ผ๋ก ๋์ฒดํ ์ ์์ต๋๋ค.
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
๋ง์ดํฌ๋ก ๊ฒฐํฉ์ ์ด๋ค ๋๊ธฐํ์ ํ์ ๊ฐ์ ๊ณต์ ๋ด๋ถ ๊ตฌ์กฐ๋ฅผ ์ค์ฌ ๊ฐ์ ๋น์ฉ์ ์ต์ํํ๋ ๊ฒ์ ๋ชฉํ๋ก ํ๋ค.
Flowable.filter
์ ๊ฐ์ ์ฐ์ฐ์๊ฐ ์ฌ์ฉ๋ ๋ ์ด๋ค ์ผ์ด ์ผ์ด๋๋์ง๋ฅผ ์ดํด๋ณด์.
์ ์คํธ๋ฆผ, filter ์ฐ์ฐ์, ๋ค์ด์คํธ๋ฆผ์ ๊ฐ์ง๊ณ ์์ต๋๋ค. ๊ฐ์ด 5๋ณด๋ค ์์์ง ๊ฒ์ฌํ๋ ํํฐ๊ฐ ์๋ค๊ณ ๊ฐ์ ํฉ์๋ค. ๊ตฌ๋ ์ด ๋๋ฉด ๋ค์ด์คํธ๋ฆผ์ ์ ์คํธ๋ฆผ์๊ฒ ๋ช๊ฐ์ ํญ๋ชฉ์ ์์ฒญํด์ผ ํฉ๋๋ค.
- ํํฐ์๊ฒ ๋ค์ด์คํธ๋ฆฝ์ 1๊ฐ์ ํญ๋ชฉ์ ์์ฒญํฉ๋๋ค.
- ํํฐ๋ ์ ์คํธ๋ฆผ์๊ฒ 1๊ฐ์ ํญ๋ชฉ์ ์์ฒญํฉ๋๋ค.
- ์ ์คํธ๋ฆผ์ ํญ๋ชฉ์ ์์ฑํ๊ณ ํํฐ๋ก ์ ๋ฌํฉ๋๋ค. (์ซ์๊ฐ 1์ด๋ผ๊ณ ํฉ์๋ค.)
- ํํฐ๋ ๊ฐ์ด ์ ์ด(predicate)๋ฅผ ๋ง์กฑํ๋์ง ํ์ธํ์ฌ ๋ค์ด์คํธ๋ฆผ์ ์ ๋ฌํฉ๋๋ค.
- ๋ค์ด์คํธ๋ฆผ์ ํญ๋ชฉ์ ๋ฐ์๋ค์ด๊ณ ํํฐ์๊ฒ ํ๋ ๋ ์์ฒญํฉ๋๋ค.
- ํํฐ๋ ์ ์คํธ๋ฆผ์๊ฒ 1๊ฐ์ ํญ๋ชฉ์ ์์ฒญํฉ๋๋ค.
- ์ ์คํธ๋ฆผ์ ํญ๋ชฉ์ ๋ง๋ค๊ณ ํํฐ์๊ฒ ์ ๋ฌํฉ๋๋ค. (์ซ์๊ฐ 10์ด๋ผ๊ณ ํฉ์๋ค.)
- ํํฐ๋ ๊ฐ์ด ์ ์ด๋ฅผ ๋ง์กฑํ์ง ๋ชปํ ๊ฒ์ ํ์ธํ๊ณ ๋ค์ด์คํธ๋ฆผ์๊ฒ๋ ์ ๋ฌํ์ง ์์ต๋๋ค. ๋ค์ด์คํธ๋ฆผ์ด ํ๋์ ํญ๋ชฉ์ ์์ฒญํ์์ง๋ง ํํฐ๊ฐ ์ ๊ณตํ์ง ์์๊ณ ๊ทธ๋์ ํํฐ๋ ์ ์คํธ๋ฆผ์๊ฒ ํ๋ ๋ ์์ฒญํฉ๋๋ค.
- ์คํธ๋ฆผ์ด ์ข ๋ฃ๋ ๋๊น์ง ์ด ์์ ์ ๋ฐ๋ณต๋ฉ๋๋ค.
์ฐ์ฐ์ ๊ฐ ๋ง์ ํต์ ์ด ์๋ ๊ฒ์ ๋ณผ ์ ์๊ณ ๋ ์ค์ํ ๊ฒ์ ๊ฐ ์ด๋ฒคํธ๊ฐ ์ง๋ ฌํ ๋ ๋ฐฉ์์ผ๋ก ์ ๊ณต๋๊ธฐ ๋๋ฌธ์ ์ฝ๊ฐ์ ๊ฐ์ ๋น์ฉ์ด ์์ต๋๋ค.
๊ทธ ์ฌ์ด์ ๋๊ฐ์ ์ฐ์ฐ์๊ฐ ์๋ค๊ณ ๊ฐ์ ํฉ์๋ค. ํต์ ์ ์๋นํ ๊ฐ์ ๋น์ฉ์ ์ฆ๊ฐ์ํฌ ์ ์์ต๋๋ค.
ConditionalSubscriber๋ ์ด ์ํฉ์ ๊ตฌํฉ๋๋ค.
public interface ConditionalSubscriber<T> extends FlowableSubscriber<T> {
boolean tryOnNext(T t);
}
์ผ๋ฐ์ ์ผ๋ก Subscriber์ onNext
์ฝ๋ฐฑ์ ์๋ฌด ๊ฐ๋ ๋ฐํํ์ง ์์ต๋๋ค. ์
์คํธ๋ฆผ์ ๋จ์ํ ๊ฐ์ ์ฝ๋ฐฑ์ ํตํด ์ ๊ณตํ๊ณ ๋ค์ด์คํธ๋ฆผ์ ์๋ก์ด ์์ฒญ์ ๊ธฐ๋ค๋ฆฝ๋๋ค. ConditionalSubscriber๋ ์ถ๊ฐ์ ์ธ tryOnNext
๋ฉ์๋๋ฅผ ๊ฐ์ง๋๋ค. ์ด๋ onNext
์ ๋น์ทํ๋ฐ ๊ฐ์ ๋ฐ์ ์ ์๋๋ ๊ฑฐ์ ๋์๋์ ๋ฐ๋ผ ๋ถ๋ฆฌ์ธ ๊ฐ์ ์ฆํ ๋ฐํํฉ๋๋ค. ์ด๋ ์
์คํธ๋ฆผ์ด ์ง์ ์๋ต์ ์์ ํ ๋ request(n)
ํธ์ถ์ด ์์ฒญ๋๋ ํ์๋ฅผ ์ค์ผ ์ ์์ต๋๋ค.
`Flowable.filter' ๊ตฌํ์ ์๋ฅผ ์ดํด๋ณด๋ฉด ์ ์คํธ๋ฆผ filter๊ฐ ์ง์ ๋ค์ด์คํธ๋ฆผ filter์ ์ ์ด๋ฅผ ์ ๊ทผํ๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค.
@Override
public boolean tryOnNext(T t) {
...
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
return b && downstream.tryOnNext(t);
}
์ด๋ ๋ช ์ฐจ๋ก์ ์์ฒญ์ ์ ์ฝํฉ๋๋ค.
์ต์ ํ๊ฐ ์ฐ์ ํํฐ ์ฐ์ฐ์์ ๊ฐ์ ๋น์ฉ์ ์ค์ด๊ณ ์ ํ๋๊ฒ ๋ชฉํ๋ผ๋ฉด ๊ทธ๋ค์ง ๋๋์ง๋ ์์ ๊ฒ์
๋๋ค. (์ด์จ๋ ํ๋์ ํํฐ ์ฐ์ฐ์๋ก ์ธ ์ ์๋ ๊ฒ ์ฒ๋ผ ๋ณด์ด๋๊น์.) ์ข์ ์ ์ Flowable.map
์ญ์ ConditionalSubscriber๋ก ๊ตฌํํ ์ ์๊ณ ์ฌ๋ฌ ํํฐ์ ๋งต ์ฐ์ฐ์๊ฐ ์ฐ์์ ์ผ๋ก ํจ๊ป ์๋ค๋ฉด ๊ฐ์ ๋น์ฉ์ ๋ ์ค์ผ ์ ์์ต๋๋ค.
@Override
public boolean tryOnNext(T t) {
...
U v;
tru {
v = ObjectHelper.requireNonNull(mapper.apply(t), ...);
} catch (Throwable ex) {
fail(ex);
}
}
๊ฐ์ฅ ๋ณต์กํ ๋ง์ดํฌ๋ก ๊ฒฐํฉ์ ์ฐ์ฐ์ ์ฌ์ด์ ๊ณต์ ๋ด๋ถ ํ๋ค์ ๊ธฐ๋ฐํฉ๋๋ค. ์ ์ฒด ์ต์ ํ๋ QueueSubscription
์ธํฐํ์ด์ค์ ๊ธฐ๋ฐํฉ๋๋ค.
public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {
}
๊ธฐ๋ณธ์ ์ผ๋ก Queue์ Subscription์ด ํ ์ธํฐํ์ด์ค ์๋ ์์ต๋๋ค. ํ์ง๋ง ํ ์ธํฐํ์ด์ค๋ ์๋ฐ์ ๊ฐ๋จํ ์ธํฐํ์ด์ค๊ฐ ์๋๋๋ค. ๋์ ์ถ๊ฐ์ ์ธ ๋ฉ์๋ requestFusion
์ ๊ฐ์ง๋๋ค.
public interface QueueFuseable<T> extends SimpleQueue<T> {
int requestFusion(int mode);
int NONE = 0;
int SYNC = 1;
int ASYNC = 2;
int ANY = SYNC | ASYNC;
int BOUNDARY = 4;
}
Flowable๊ณผ Subscriber ์ฌ์ด์ onXXX
์ฝ๋ฐฑ์ ์ฌ์ฉํ๋ ์ผ๋ฐ์ ์ธ ์ปค๋ฎค๋์ผ์ด์
๊ณผ ๋น๊ตํด์, ์
์คํธ๋ฆผ์ Subscription ๋ฟ๋ง ์๋๋ผ QueueSubscription๋ฅผ ์ ๊ณตํ ์ ์์ต๋๋ค. QueueSubscription์ ์ง์ ์ ์ผ๋ก ๋ด๋ถ์ ํ์ ์ ๊ทผํ ์ ์์ต๋๋ค.
๋์๋ฐฉ์์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค. ๋จผ์ onSubscribe ๋์ ์
์คํธ๋ฆผ๊ณผ ๋ค์ด์คํธ๋ฆผ์ ๊ฒฐํฉ์ ํฉ์ํ๊ณ ์๋ํ ๊ฒฐํฉ ๋ชจ๋(fusion mode)๋ฅผ ์ ํํฉ๋๋ค. ๊ฒฐํฉ์ด ํฉ์๋๋ฉด ์๋ก์ด ํต์ ๊ตฌํ์ด ์ฌ์ฉ๋ฉ๋๋ค. ๊ทธ๋ ์ง ์๋ค๋ฉด onXXX
์ฝ๋ฐฑ์ ์ฌ์ฉํ๋ ์ ํต์ ์ธ ํต์ ์ด ์๋ฆฝ๋ฉ๋๋ค.
์ผ๋ฐ์ ์ผ๋ก ๊ฒฐํฉ์ด ์๋ฆฝ๋๋ฉด ๋ค์ด์คํธ๋ฆผ์ ์
์คํธ๋ฆผ ํ์ poll()
๋ฉ์๋๋ฅผ ์ง์ ํธ์ถํ์ฌ ํญ๋ชฉ์ ์ป์ ๊ฒ์
๋๋ค.
์ธ๊ฐ์ง ๊ฒฐํฉ ๋ชจ๋๊ฐ ์์ต๋๋ค.
- NONE - ๊ฒฐํฉ์ ํ์ง ์๋ ๊ฒ์ ๋๋ค.
- SYNC - ๋๊ธฐ์ ์ธ ๊ฒฐํฉ์ด ์ด๋ฃจ์ด์ง๋๋ค.
- ASYNC - ๋น๋๊ธฐ์ ์ธ ๊ฒฐํฉ์ด ์ด๋ฃจ์ด์ง๋๋ค.
ANY๋ ๋จ์ํ SYNC์ด๊ฑฐ๋ ASYNC์ ๋๋ค. (์ด๋ ์ ์คํธ๋ฆผ์ด ๋ฌด์์ ์ง์ํ๋์ ๋ฐ๋ผ ์๋ฆฝ์ด ๊ฒฐ์ ๋ฉ๋๋ค.)
์ ์คํธ๋ฆผ์ ๊ฐ์ด ์ด๋ฏธ ์ ์ ์ด๊ฑฐ๋ poll()์ด ํธ์ถ๋ ๋ ๋๊ธฐ์ ์ผ๋ก ์์ฑ๋๋ค๋ฉด SYNC ๊ฒฐํฉ์ ์ฌ์ฉํ ์ ์์ต๋๋ค.
๋ง์ฝ ์ ์คํธ๋ฆผ๊ณผ ๋ค์ด์คํธ๋ฆผ์ด ๋๊ธฐ ๊ฒฐํฉ ๋ชจ๋๋ฅผ ์ฌ์ฉํ๊ธฐ๋ก ํฉ์๋๋ฉด ์๋ก์ ๊ณ์ฝ์ ๋ฐ๋ฆ ๋๋ค.
- ๋ค์ด์คํธ๋ฆผ์ ํ์์ ๋ฐ๋ผ ์ง์
poll()
๋ฉ์๋๋ฅผ ํธ์ถํฉ๋๋ค. poll()
๋ฉ์๋๋ ์์ธ๋ฅผ ๋์ง ์ ์์ต๋๋ค. ์ด๋onError
์ ๋์ผํฉ๋๋ค.poll()
๋ฉ์๋๋ null์ ๋ฐํํ ์ ์์ต๋๋ค. ์ด๋onComplete
์ ๋์ผํฉ๋๋ค.poll()
๋ฉ์๋๋ null์ด ์๋ ๊ฐ์ ๋ฐํํ ์ ์์ต๋๋ค. ์ด๋onNext
์ ๋์ผํฉ๋๋ค.- ์
์คํธ๋ฆผ์ ์ด๋ค
onXXX
์ฝ๋ฐฑ๋ ํธ์ถํ์ง ์์ต๋๋ค.
SYNC ๊ฒฐํฉ ๋ชจ๋๋ฅผ ์ง์ํ๋ ์ฐ์ฐ์์ ์๋ Flowable.range
์
๋๋ค.
@Override
public final int requestFusion(int mode) {
return mode & SYNC;
}
@Nullable
@Override
public final Integer poll() {
int i = index;
if (i == end) {
return null;
}
index = i + 1;
return i;
}
poll()์ ํธ์ถํ ๋ ๋์ค์(eventually) ์ ์คํธ๋ฆผ์ ๊ฐ์ด ์ฌ์ฉ๊ฐ๋ฅํด์ง๋ ASYNC ๊ฒฐํฉ ๋ชจ๋๋ฅผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
์ ์คํธ๋ฆผ๊ณผ ๋ค์ด์คํธ๋ฆผ์ด ๋น๋๊ธฐ ๊ฒฐํฉ ๋ชจ๋๋ฅผ ์ฌ์ฉํ๊ธฐ๋ก ํฉ์๋๋ฉด ์๋์ ๊ณ์ฝ์ ๋ฐ๋ฆ ๋๋ค.
- ์
์คํธ๋ฆผ์ ์ผ๋ฐ์ ์ผ๋ก
onError
์onComplete
๋ฅผ ๋ณด๋ ๋๋ค. - onNext์ ์ ์คํธ๋ฆผ์ ๊ฐ ๋ฟ ์๋๋ผ null์ ๋์ ๊ฐ์ง ์ ์์ต๋๋ค. ๋ค์ด์คํธ๋ฆผ์ ์ด onNext๋ฅผ poll()์ ํธ์ถํ ์ ์๋ ํ์์ผ๋ก ์ทจ๊ธํ ์ ์์ต๋๋ค.
- poll()์ ํธ์ถ์๋ ์์ธ๋ฅผ ์ฒ๋ฆฌํด์ผ ํฉ๋๋ค.
๋ง์ต๋๋ค. RxJava์์ onNext๋ null๊ฐ์ ๊ฐ์ง ์ ์์ต๋๋ค.
ASYNC ๊ฒฐํฉ ๋ชจ๋๋ฅผ ์ง์ํ๋ ์ฐ์ฐ์์ ์๋ Flowable.filter
์
๋๋ค.
public T poll() throws Exception {
QueueSubscription<T> qs = this.qs;
Predicate<? super T> f = filter;
for (;;) {
T t = qs.poll();
if (t == null) {
return null;
}
if (f.test(t)) {
return t;
}
if (sourceMode == ASYNC) {
qs.request(1);
}
}
}
@Override
public boolean tryOnNext(T t) {
if (done) {
return false;
}
if (sourceMode != NONE) {
return downstream.tryOnNext(null);
}
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
return b && downstream.tryOnNext(t);
}
๊ฒฐํฉ ๋ชจ๋๋ฅผ ์ง์ํ๋ ๋ช๊ฐ์ ์ฐ์ฐ์๋ฅผ ์ดํ์ง๋ง ์ด ๋ชจ๋๋ฅผ ํ์ฑํํ๊ธฐ ์ํด ๋จผ์ ๊ฒฐํฉ์ ์์ฒญํด์ผ ํฉ๋๋ค. Flowable.flatMap
๋ด์์ ๊ฒฐํฉ ๋ชจ๋๊ฐ ์์ฒญํ๋ ์์
๋๋ค.
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
if (s instanceOf QueueSubscription) {
QueueSubscription<U> qs = (QueueSubscription<U>) s;
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
if (m == QueueSubscription.SYNC) {
fusionMode = m;
queue = qs;
done = true;
paent.drain();
}
if (m == QueueSubscription.ASYNC) {
fusionMode = m;
queue = qs;
}
}
s.request(bufferSize);
}
}
์์ค๊ฐ QueueSubscription
์ ๊ตฌํํ๋ค๋ฉด ๊ตฌ๋
๋์ ๊ฒฐํฉ ๋ชจ๋ ANY๊ฐ ์์ฒญ๋๋ ๊ฒ์ ๋ณผ ์ ์์ต๋๋ค. ์์ค์ ์ํด ์๋ฝ๋ ๋ชจ๋์ ๋ฐ๋ผ ๋ค๋ฅธ ์ ๋ต์ด ์ ์ฉ๋ฉ๋๋ค.
map ์์์ ๋ฌด๊ฑฐ์ด ์ฐ์ฐ์ด ์ด๋ฃจ์ด์ง ์ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ (์ง์ ํด๋ง์ผ ๊ฒฝ์ฐ) ํด๋น ๊ณ์ฐ์ด ๋ค๋ฅธ ์ค๋ ๋๋ก ์ ์ถ๋ ์ ์์ต๋๋ค. ์ด ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ๊ธฐ ์ํด ์ถ๊ฐ์ ์ธ ๋ง์ปค ์ต์ BOUNDARY๊ฐ ์์ต๋๋ค. ์ด๋ poll ๋ฉ์๋์ ํธ์ถ์๊ฐ ๋ค๋ฅธ ์ค๋ ๋์ผ ์ ์์์ ๋ํ๋ ๋๋ค. ์ฐ์ฐ์๋ BOUNDARY ์ต์ ์ ๋ฌด์ํ ์ ์๊ณ , ๋ค๋ฅธ ์ค๋ ๋์์ ํ์ ์ ๊ทผ์ ํ์ฉํ ์๋, BOUNDARY ์ต์ ์ด ์์ฒญ๋ ๊ฒฝ์ฐ ๊ฒฐํฉ์ ๊ฑฐ๋ถํ ์ ์์ต๋๋ค.
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
(transitiveBoundaryFusion) ๋ด๋ถ๋ BOUNDARY ๋ชจ๋๋ฅผ ํ์ฉํ์ง ์์ต๋๋ค.
protected final int transitiveBoundaryFusion(int mode) {
QueueSubscription<T> qs = this.qs;
if (qs != null) {
if ((mode & BOUNDARY) == 0) {
int m = qs.requestFusion(mode);
if (m != NONE) {
sourceMode = m;
}
return m;
}
}
return NONE;
}
์ด ๊ธ์์ RxJava์ ๋ช๊ฐ์ง ์ต์ ํ์ ๊ฐ์๋ฅผ ์ดํด๋ณด๊ณ ๋ช๊ฐ์ง ํฅ๋ฏธ๋ก์ด ๊ฒ์ ๋ดค์ต๋๋ค.
- RxJava 2์ Observable์ ๋ฐฐ์์ ์ง์ํ์ง ์์ต๋๋ค. (๋ ์ด์์ ํญ๋ชฉ์ด ์๋ค๋ ๊ฑธ ์ ์คํธ๋ฆผ์ ์๋ฆด ๋ฐฉ๋ฒ์ด ์์ต๋๋ค.)
- ๋ช๊ฐ์ง ๋ด๋ถ ์ต์ ํ๊ฐ null ๊ฐ์ ์ฝ๋ฐฑ์ ์ ํ๋ ๊ฒ์ ๊ธฐ๋ฐํ๊ธฐ ๋๋ฌธ์ RxJava์์ null ๊ฐ์ ์ ๋ฌํ๋ ๊ฒ์ ์๋ฉ๋๋ค.
- ์ต์ ํ๋ฅผ ๋ชจ๋ ๋๊ณ ์ถ๋ค๋ฉด
hide()
์ฐ์ฐ์๋ ๋งค์ฐ ์ค์ํฉ๋๋ค. - ์ฐ์ฐ์ ๊ฒฐํฉ์ ํ๋ คํ์ง๋ง ์ฌ๋ฌ ์ด์ ํ ์ค์ ํ๋์ ๋ถ๊ณผํฉ๋๋ค. ๋ชจ๋ ์ฐ์ฐ์์์ ์ฌ์ฉํ ์ ์๋ ๊ฒ์ ์๋๋๋ค. ์ต์ ํ ๋ ๊ฒ ๊ฐ์ ๋ณด์ด์ง๋ง ๊ฒฝ์ฐ์ ๋ฐ๋ผ ์ด๋ค ์ต์ ํ๋ ์๋ํ์ง ์์ ๋๋ ์ ์์ต๋๋ค. ๊ทธ ์ด์ ๋ ์ด๋ฐํ ์ต์ ํ๊ฐ ์ผ๋ถ ์ค์ํ ๋ถ๋ถ๊ณผ ๊ณตํต ๋ฌธ์ ์ ์ ์ฉ๋๊ณ ์ผ๋ฐ์ ์ธ ์ต์ ํ๋ ๋งค์ฐ ์ด๋ ต๊ธฐ ๋๋ฌธ์ ๋๋ค.
๊ทธ๋์ RxJava ๋ด๋ถ์ ๋ชจ๋ ๊ฒ์ด ํจ์จ์ ์ด๋ค ์๊ฐํ๊ณ ๊ธด Rx ์ฒด์ธ์ ๋ง๋ค์ง ๋ง์ญ์์. ๋น์ ์ ์ฝ๋๋ฅผ ๋ฒค์น๋งํฌ ํ๊ณ ์ค์ํ ์ฒด์ธ์ ํ๋กํ์ผ๋งํด์ ๊ฐ๋ณ์ ์ผ๋ก ์ต์ ํํ ์ ์๋ ๋ฐฉ๋ฒ์ ์ฐพ์ผ์ธ์.
์๋์ ๊ธ์ ๊ฐ์ด ์ฝ์ผ์ธ์.
- https://akarnokd.blogspot.com/2016/03/operator-fusion-part-1.html
- https://akarnokd.blogspot.com/2016/04/operator-fusion-part-2-final.html
ํ๋ณตํ ์ฝ๋ฉํ์ธ์.