Skip to content

Instantly share code, notes, and snippets.

@dalinaum
Last active December 24, 2020 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dalinaum/a2e4f698cf7938041eb24c197266ac72 to your computer and use it in GitHub Desktop.
Save dalinaum/a2e4f698cf7938041eb24c197266ac72 to your computer and use it in GitHub Desktop.

RxJava 2์˜ ์—ฐ์‚ฐ์ž ๊ฒฐํ•ฉ (Operator fusion in RxJava 2) https://proandroiddev.com/operator-fusion-in-rxjava-2-dcd6612cffae

์ž‘์„ฑ: Vasya Drobushkov

์„œ๋ฌธ

RxJava๋Š” ๊ฐ•๋ ฅํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ง€๋งŒ ๋ช‡๊ฐ€์ง€ ๋ฌธ์ œ๋„ ์žˆ์Šต๋‹ˆ๋‹ค. ํŠนํžˆ ์„ฑ๋Šฅ๊ณผ ๋ฉ”๋ชจ๋ฆฌ ๋ฌธ์ œ๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ํ’€๋ ค๋Š” ๋ฌธ์ œ์™€ ๊ธฐ์ˆ ์ ์ธ ๊ด€์ ์—์„œ ์†”๋ฃจ์…˜์ด ์„ค๊ณ„๋˜๋Š” ๋ฐฉ์‹์—์„œ ๋น„๋กฏ๋ฉ๋‹ˆ๋‹ค.

RxJava๋Š” ๊ฐ„์ ‘๋น„์šฉ(overhead)๋ฅผ ์ตœ์†Œํ™”ํ•˜๊ธฐ ์œ„ํ•ด "์—ฐ์‚ฐ์ž ๊ฒฐํ•ฉ" (Operator fusion)์ด๋ผ ๋ถˆ๋ฆฌ๋Š” ์—ฌ๋ผ ๊ฐ€์ง€ ์ตœ์ ํ™”๋ฅผ ๊ฐ€์ง‘๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ์šฐ๋ฆฌ๋Š” ์ด ๊ธ€์—์„œ ๊ฑฐ๊ธฐ์— ๋Œ€ํ•ด ๋‹ค๋ฃฐ ์˜ˆ์ •์ž…๋‹ˆ๋‹ค.

ํ•˜์ง€๋งŒ ๋จผ์ € RxJava ๋ฐ˜์‘ํ˜• ํƒ€์ž…์ด ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•˜๋Š”์ง€์™€ ๋ฌด์Šจ ๋ฌธ์ œ๊ฐ€ ์žˆ๋Š”์ง€ ์š”์•ฝํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

Observable

Observable๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ ํฌ๊ฒŒ ์„ธ๊ฐ€์ง€ ๋ถ€๋ถ„์ด ์žˆ์Šต๋‹ˆ๋‹ค. Observable, Observer, Disposable.

์šฐ๋ฆฌ ๋ชจ๋‘ Observable์„ ์•Œ๊ณ  ์žˆ๊ณ  ์–ด๋–ป๊ฒŒ ๋งŒ๋“œ๋Š”์ง€ ์••๋‹ˆ๋‹ค. (์˜ˆ: Observable.just("Hello, World!")) Observable์€ ๊ฐ Rx ์ฒด์ธ์˜ ์ƒ์„ฑ ๋ธ”๋ก์ž…๋‹ˆ๋‹ค.Observable์„ ์ž‘๋™ํ•˜๋ ค๋ฉด subscribe(...) ๋ฉ”์„œ๋“œ์— Observer๋ฅผ ์ „๋‹ฌํ•ด ๊ตฌ๋…ํ•ด์•ผํ•ฉ๋‹ˆ๋‹ค.

Observer๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ onSubscribe, onNext, onComplete ์ฝœ๋ฐฑ์„ ๊ฐ€์ง„ ์ธํ„ฐํŽ˜์ด์Šค์ž…๋‹ˆ๋‹ค.

Observer๊ฐ€ Observable์„ ๊ตฌ๋…ํ•˜๋ฉด (Observer๊ฐ€ ๋‚˜์ค‘์— ํ•„์š”ํ•  ๋•Œ Rx ์ฒด์ธ์„ ์—†์• ๊ธฐ(dispose) ์œ„ํ•ด) Observable์€ Disposable ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•˜๊ณ  onSubscribe ์ฝœ๋ฐฑ์„ ํ†ตํ•ด Observer๋กœ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์ด ์ž‘์—…์ด ์™„๋ฃŒ๋˜๋ฉด, ํ†ต์‹ ์ด ์—ฐ๊ฒฐ๋˜๊ณ  Observable์ด ์ถ”๊ฐ€์ ์ธ ๋Œ€๊ธฐ์—†์ด onNext๋ฅผ ํ†ตํ•ด ๊ฐ’๋“ค์„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ทธ๋ž˜์„œ Observable์€ ๋ฐฐ์••(backpressure)์„ ์ง€์›ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. Observer๊ฐ€ Observable์—๊ฒŒ ๋” ์ด์ƒ ๊ฐ’์„ ๋ณด๋‚ด์ง€ ๋ง๋ผ ์•Œ๋ฆด ๋ฐฉ๋ฒ•์ด ์—†์Šต๋‹ˆ๋‹ค.

Flowable

Flowable์€ ๋ชจ๋“  ๊ฒƒ์ด ๋น„์Šทํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ Observer์™€ Disposable ๋Œ€์‹  Subscriber์™€ Subscription๋ฅผ ๊ฐ€์ง‘๋‹ˆ๋‹ค.

Subscription์€ ์ถ”๊ฐ€๋กœ request(n) ๋ฉ”์„œ๋“œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋ฅผ ์ด์šฉํ•˜์—ฌ Subscriber๋Š” ๋ช…์‹œ์ ์œผ๋กœ Flowable์—๊ฒŒ ์š”์ฒญ๋œ ์–‘์˜ ์•„์ดํ…œ์„ ๋ณด๋‚ด๊ฒŒ ์š”์ฒญํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ฐ’์„ ์š”์ฒญํ•˜์ง€ ์•Š์œผ๋ฉด Flowable์€ ์–ด๋–ค ๊ฐ’๋„ ๋‚ด๋ณด๋‚ด์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์ด๊ฒŒ Flowable์ด ๋ฐฐ์••์„ ์ง€์›ํ•˜๋Š” ์ด์œ ์ž…๋‹ˆ๋‹ค.

์กฐ๋ฆฝ๊ณผ ๊ตฌ๋…

RxJava ๋ฐ˜์‘ํ˜• ํƒ€์ž…์„ ์‚ฌ์šฉํ•  ๋•Œ ๋‘ ์ค‘์š”ํ•œ ๋‹จ๊ณ„์ธ ์กฐ๋ฆฝ(assembly)๊ณผ ๊ตฌ๋…(subscribe)์ด ์žˆ์Šต๋‹ˆ๋‹ค.

์กฐ๋ฆฝํ•  ๋•Œ Rx ์ฒด์ธ์ด ๋งŒ๋“ค์–ด์ง€๊ณ  ๊ตฌ๋…ํ•  ๋•Œ ์šฐ๋ฆฌ๋Š” Rx ์ฒด์ธ์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

์•„๋ž˜์˜ ์˜ˆ์ œ๋ฅผ ๊ณ ๋ คํ•ด๋ณด์„ธ์š”.

Observable.just(1, 2, 3)
    .map { it + 1 }
    .filter { it < 3 }
    .subscribe()

์ด ๊ฒฝ์šฐ ์œ„์—์„œ ์•„๋ž˜๋กœ ์ด๋™ํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™์ธ ์ผ์ด ์ผ์–ด๋‚ฉ๋‹ˆ๋‹ค.

  • ObservableJust ๊ฐ์ฒด๊ฐ€ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.
  • ObservableMap ๊ฐ์ฒด๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  ์ด์ „์— ์ƒ์„ฑ๋œ ObservableJust๋ฅผ ์ƒˆ Observable์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค. (๊ทธ๋ž˜์„œ ์กฐํ•ฉ์ด ๋ฉ๋‹ˆ๋‹ค.)
  • ObservableFilter ๊ฐ์ฒด๊ฐ€ ์ƒ์„ฑ๋˜๊ณ  ์ด์ „์— ์ƒ์„ฑ๋œ (ObservableJust๊ฐ€ ๋‚ด์žฅ๋œ) ObservableMap๋ฅผ ์ƒˆ Observable์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.
  • ObservableFilter๋ฅผ ๊ตฌ๋…ํ•˜์—ฌ ์‹ค์ œ ๊ตฌ๋…(subscription)์„ ์ž‘๋™ํ•ฉ๋‹ˆ๋‹ค.
  • ObservableFilter๋Š” ๋‚ด๋ถ€์˜ ์ž์ฒด ์˜ต์ €๋ฒ„๋ฅผ ๋งŒ๋“ค์–ด ObservableMap์„ ๊ตฌ๋…ํ•ฉ๋‹ˆ๋‹ค.
  • ObservableMap์€ ๋‚ด๋ถ€์˜ ์ž์ฒด ์˜ต์ €๋ฒ„๋ฅผ ๋งŒ๋“ค์–ด ObservableJust๋ฅผ ๊ตฌ๋…ํ•ฉ๋‹ˆ๋‹ค.
  • ObservableJust๋Š” onSubscribe ์ด๋ฒคํŠธ๋ฅผ ๋‹ค์šด์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ณด๋ƒ…๋‹ˆ๋‹ค. (๋‹ค๋ฅธ ์˜ต์ €๋ฒ„๋ธ”๋“ค๋„ ์ด ์ด๋ฒคํŠธ๋ฅผ ์ฒด์ธ์˜ ์ตœ์‹  ์˜ต์ €๋ฒ„๋ธ”๊นŒ์ง€ ๋‹ค์šด์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ณด๋ƒ…๋‹ˆ๋‹ค.)
  • ObservableJust๋Š” ๊ฐ’์„ ๋ฐœํ–‰ํ•˜๊ณ  onNext ์ฝœ๋ฐฑ์„ ํ†ตํ•ด ๋‹ค์šด์ŠคํŠธ๋ฆผ์œผ๋กœ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์ด ์งง์€ Rx ์ฒด์ธ์—์„œ ๊ฝค ๋งŽ์€ ์ผ์ด ์ผ์–ด๋‚˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ์ฒด์ธ์ด Flowable ํƒ€์ž…์ด์—ˆ๋‹ค๋ฉด request(n)์˜ ์ถ”๊ฐ€ ํ†ต์‹ ์ด ๋ฐœ์ƒํ•ด ๋” ๋ณต์žกํ•ด์ง‘๋‹ˆ๋‹ค.

ํ์™€ ๋™๊ธฐํ™”

์—ฐ์‚ฐ์ž ์•ˆ์— ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๋‚ด๋ถ€ ํ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ํ์— ๋Œ€ํ•œ ์ ‘๊ทผ์€ ์ง๋ ฌํ™”๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. (์ด๋Š” ์ ์ ˆํ•œ ๋™๊ธฐํ™”๋ฅผ ํ†ตํ•ด์„œ ์ ‘๊ทผํ•ด์•ผํ•œ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.)

RxJava2๋Š” (์„ฑ๋Šฅ์„ ์œ„ํ•ด) ์›์ž ๊ฐ์ฒด(Atomic object, ์˜ˆ: AtomicInteger)์™€ compareAndSet ๋ฉ”์„œ๋“œ๋ฅผ ์ด์šฉํ•œ ๋ฌดํ•œ ๋ฐ˜๋ณต์— ๊ธฐ๋ฐ˜ํ•œ ๋ธ”๋กํ‚น ์—†๋Š” ๋™๊ธฐํ™”๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋ผ์ด๋ธŒ๋ฆฌ๋ฆฌ์—์„œ ์ผ๋ฐ˜์ ์œผ๋กœ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ฝ”๋“œ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

for (; ; ) {
    long r = state.get();

    if ((r & COMPLETED_MASK) != 0L) {
        return;
    }

    long u = r | COMPLETED_MASK;
    // (active, r) -> (complee, r) ์ „ํ™˜
    if (state.compareAndSet(r, u)) {
        // ๋งŒ์•ฝ ์š”์ฒญ๋œ ์–‘์ด 0์ด ์•„๋‹ˆ๋ฉด ํ๋ฅผ ๋น„์›๋‹ˆ๋‹ค.
        if (r != 0L) {
            postCompleteDrain(u, actual, queue, state, isCancelled);
        }

        return;
    }
}

์ฒด์ธ์˜ ๊ฐ ์—ฐ์‚ฐ์ž๊ฐ€ ๊ฐ์ž ํ๋ฅผ ๊ฐ€์ง„๋‹ค๊ณ  ํ•˜๋ฉด ์›์ž ๊ฐ์ฒด๋ฅผ ๊ฐ€์ง„ ์—ฐ์‚ฐ์ž์˜ ํ๋Š” ๊ฐ„์ ‘๋น„์šฉ์„ ๊ฐ€์ง‘๋‹ˆ๋‹ค.

๋ฌธ์ œ์ 

์œ„์˜ ๋ชจ๋“  ๊ฒƒ์„ ๊ณ ๋ คํ•ด RxJava๊ฐ€ ๊ฐ€์ง„ ๋ฌธ์ œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • ์กฐ๋ฆฝ ์‹œ๊ฐ„ ๊ฐ„์ ‘๋น„์šฉ - Rx ์ฒด์ธ์„ ์ƒ์„ฑํ•˜๋ฉด ๊ฝค ๋งŽ์€ ๊ฐ์ฒด๋ฅผ ๋งŒ๋“ค์–ด์•ผ ํ•˜๊ณ  ๋ฉ”๋ชจ๋ฆฌ ๊ฐ„์ ‘๋น„์šฉ๋ฅผ ์œ ๋ฐœํ•ฉ๋‹ˆ๋‹ค.
  • ๊ตฌ๋… ์‹œ๊ฐ„ ๊ฐ„์ ‘๋น„์šฉ - Rx ์ฒด์ธ์„ ๊ตฌ๋…ํ•˜๊ณ  ๋งŽ์€ ํ†ต์‹ ์ด ์ด๋ฃจ์–ด์ง€๋ฉด ์„ฑ๋Šฅ ์ƒ์˜ ๊ฐ„์ ‘๋น„์šฉ์ด ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค.
  • ํ• ๋‹น๊ณผ ์ง๋ ฌํ™” ๊ฐ„์ ‘๋น„์šฉ - ๋งค ์—ฐ์‚ฐ์ž๋งˆ๋‹ค ํ์™€ ์›์ž ๊ฐ์ฒด๊ฐ™์€ ๋‚ด๋ถ€ ๊ตฌ์กฐ๋ฅผ ๋งŒ๋“œ๋Š” ๊ฒƒ์€ ๋ฉ”๋ชจ๋ฆฌ์™€ ์„ฑ๋Šฅ์˜ ๊ฐ„์ ‘๋น„์šฉ์„ ์œ ๋ฐœํ•ฉ๋‹ˆ๋‹ค.

์—ฐ์‚ฐ์ž ๊ฒฐํ•ฉ

์„ฑ๋Šฅ๊ณผ ๋ฉ”๋ชจ๋ฆฌ ๋ฌธ์ œ๋ฅผ ํ’€๊ธฐ ์œ„ํ•ด ์—ฐ์‚ฐ์ž ๊ฒฐํ•ฉ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

๋‘๊ฐ€์ง€ ํ˜•ํƒœ์˜ ๊ฒฐํ•ฉ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

  • ๋งคํฌ๋กœ ๊ฒฐํ•ฉ (Macro fusion) - ๋ช‡ ๊ฐœ์˜ ์—ฐ์‚ฐ์„ ํ•˜๋‚˜๋กœ ํ•ฉ๋ณ‘ํ•˜์—ฌ ์กฐํ•ฉ๊ณผ ๊ตฌ๋… ๋™์•ˆ ์ƒ์„ฑ๋˜๋Š” ๊ฐ์ฒด์˜ ์ˆ˜๋ฅผ ์ตœ์†Œํ™” ํ•ฉ๋‹ˆ๋‹ค.
  • ๋งˆ์ดํฌ๋กœ ๊ฒฐํ•ฉ (Micro fusion) - ์—ฐ์‚ฐ์ž ์‚ฌ์ด์˜ ๋ถˆํ•„์š”ํ•œ ๋™๊ธฐํ™”์™ธ (ํ์™€ ๊ฐ™์€) ๊ณต์œ  ๋‚ด๋ถ€ ๊ตฌ์กฐ๋ฅผ ์ œ๊ฑฐํ•ฉ๋‹ˆ๋‹ค.

์กฐ๋ฆฝ์—์„œ ๋งคํฌ๋กœ ๊ฒฐํ•ฉ

์กฐ๋ฆฝ ๊ฒฐํ•ฉ

์กฐ๋ฆฝ์—์„œ ๋งคํฌ๋กœ ๊ฒฐํ•ฉ์€ Observable์˜ ์ˆ˜์™€ ์กฐํ•ฉ ๋•Œ ์ƒ์„ฑ๋œ ๊ฐ์ฒด๋ฅผ ์ตœ์†Œํ™”ํ•˜๋Š” ๋ฐ ์ง‘์ค‘ํ•ฉ๋‹ˆ๋‹ค. "์กฐ๋ฆฝ"์€ ์ด๋Ÿฐ ๊ฒƒ์„ ์ด์•ผ๊ธฐ ํ•ฉ๋‹ˆ๋‹ค.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

์กฐ๋ฆฝ ๊ฒฐํ•ฉ์˜ ๊ธฐ๋ณธ

Observable๋“ค์„ ์ตœ์ ํ™”ํ•˜๋Š” ๋‹จ์ˆœํ•œ ๋ฐฉ๋ฒ•์€ ํŠน๋ณ„ํ•œ ๊ฒฝ์šฐ์— ๋Œ€ํ•œ ๊ฒ€์‚ฌ๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ, ์ผ๋ฐ˜์ ์ธ Observable ๋ณด๋‹ค ๋‹จ์ˆœํ•œ Observable์„ ์ƒ์„ฑํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด Observable.fromArray๋ฅผ ์‚ดํŽด๋ด…์‹œ๋‹ค. ์ด๋Š” ํ•ญ๋ชฉ์˜ ๊ฐœ์ˆ˜๊ฐ€ 0์ด๋‚˜ 1์ด๋ฉด ๊ฐ๊ฐ Observable.empty ๋˜๋Š” Observable.just๋กœ ๋‹ค์šด๊ทธ๋ ˆ์ด๋“œํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    if (items.length == 0) {
        return empty();
    } else if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

ScalarCallable

๊ฒฐํ•ฉ(fuseable) ํŒจํ‚ค์ง€์˜ ์ฒซ "๊ณ ๊ธ‰" ์ตœ์ ํ™”๋Š” ScalarCallable ์ธํ„ฐํŽ˜์ด์Šค์ž…๋‹ˆ๋‹ค.

public interface ScalarCallable<T> extends Callable<T> {

    // ์˜ˆ์™ธ๋ฅผ ๋‚ด์ง€ ์•Š๋„๋ก ์˜ค๋ฒ„๋ผ์ด๋“œํ•ฉ๋‹ˆ๋‹ค.
    @Override
    T call();
}

๊ธฐ๋ณธ์ ์œผ๋กœ ์ž๋ฐ”์˜ Callable ์ธํ„ฐํŽ˜์ด์Šค์™€ ๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” ๋™์ผํ•œ๋ฐ ์˜ˆ์™ธ๋ฅผ ๋‚ด์ง€ ์•Š๋Š” ์ฐจ์ด๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

ScalarCallable๋ž€ ๋ฐ˜์‘ํ˜• ํƒ€์ž…์€ ์กฐ๋ฆฝ ์‹œ๊ฐ„ ๋™์•ˆ ์•ˆ์ „ํ•˜๊ฒŒ ์ถ”์ถœํ•  ์ˆ˜ ์žˆ๋Š” ์ƒ์ˆ˜๊ฐ’์„ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ์ธํ„ฐํŽ˜์ด์Šค์ž…๋‹ˆ๋‹ค. ํŠนํžˆ ์ด๋Ÿฐ ๋ฐ˜์‘ํ˜• ํƒ€์ž…์€ ์ •ํ™•ํžˆ ๊ฐ’ ํ•˜๋‚˜๋ฅผ ๊ฐ€์ง€๊ฑฐ๋‚˜ ์™„์ „ํžˆ ํ•ญ๋ชฉ ์ž์ฒด๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.

๊ทธ๋ž˜์„œ ์šฐ๋ฆฌ๊ฐ€ call ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ• ๋–„, ๋ฐ˜ํ™˜ ๊ฐ’์ด null์ด์—ˆ๋‹ค๋ฉด ๋ฐ˜์‘ํ˜• ํƒ€์ž…์€ ์–ด๋–ค ๊ฐ’๋„ ๊ฐ€์ง€์ง€ ์•Š์œผ๋ฉฐ, null์ด ์•„๋‹Œ ๊ฐ’์ด ์žˆ๋‹ค๋ฉด ํ•˜๋‚˜์˜ ๊ฐ’๋งŒ ๊ฐ€์ง‘๋‹ˆ๋‹ค.

์„ค๋ช…๋œ ์ œ์•ฝ์„ ๊ธฐ๋ฐ˜์œผ๋กœ Observable, Floawable, Maybe์˜ empty, just ์—ฐ์‚ฐ์ž๋งŒ์ด ์ด ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํ‘œ์‹œ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

xMap ์—ฐ์‚ฐ์ž(flatMap, switchMap, contactMap)๋กœ ์˜ˆ๋ฅผ ๋“ค๊ฒ ์Šต๋‹ˆ๋‹ค. ์†Œ์Šค๊ฐ€ ์ด ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํ‘œ์‹œ๋  ์ˆ˜ ์žˆ๋‹ค๋ฉด ์ด ์ตœ์ ํ™”๋ฅผ ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {

    if (this instanceOf ScalarCallable) {
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    return RxJavaPlugins.onAssembly(new ObservableFlatmap<T, R>(this, ...));
}

์ด ๊ฒฝ์šฐ์— ์†Œ์Šค๊ฐ€ ScalarCallable๋กœ ํ‘œ์‹œ๋˜์—ˆ๋‹ค๋ฉด (๊ฝค ๋ฌด๊ฑฐ์šด) ์ „์ฒด ๊ตฌํ˜„ ๋Œ€์‹ ์— xMap์˜ ๋‹จ์ˆœํ™”๋œ ๋ฒ„์ „์œผ๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

FuseToXXX

fuseable ํŒจํ‚ค์ง€์— ์„ธ๊ฐ€์ง€ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

public interface FuseToObservable<T> {
    Observable<T> fuseToObservable();
}

public interface FuseToFlowable<T> {
    Floawable<T> fuseToFlowable();
}

public interface FuseToMaybe<T> {
    Maybe<T> fuseToMaybe();
}

FuseToObservable์„ ์‚ดํŽด๋ด…์‹œ๋‹ค. ๋‹ค๋ฅธ ์ธํ„ฐํŽ˜์ด์Šค๋“ค๋„ ์ „๋ถ€ ๋น„์Šทํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์Œ Rx ์ฒด์ธ์„ ๊ณ ๋ คํ•ด๋ด…์‹œ๋‹ค.

Observable.range(1, 10)
    .count()
    .toObservable()
    .subscribe()

์—ฌ๊ธฐ์„œ range๋ฅผ ๋งŒ๋“ค๊ณ  ๋‚ด๋ณด๋‚ธ ํ•ญ๋ชฉ์˜ ๊ฐฏ์ˆ˜๋ฅผ ์„ธ์–ด๋ด…์‹œ๋‹ค. count ์—ฐ์‚ฐ์ž๋Š” Single์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์šฐ๋ฆฌ๋Š” Observble์„ ๋ฐ›๊ธธ ์›ํ•˜๊ณ ์š”. (๋‹ค๋ฅธ Observable๊ณผ ์ด ๊ฒฐ๊ณผ๋ฅผ ๋จธ์ง€ํ•ด๋Š” ์˜ˆ๋กœ ๋“ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.) Rx ์ฒด์ธ์— ์ถ”๊ฐ€์ ์ธ toObservable ์—ฐ์‚ฐ์ž๋ฅผ ์ถ”๊ฐ€ํ•ด ๋” ๋ณด์žกํ•˜๊ณ  ๋ฌด๊ฑฐ์›Œ์กŒ์Šต๋‹ˆ๋‹ค.

FuseToObservable์€ ์—ฌ๊ธฐ์— ๋„์›€์ด ๋ฉ๋‹ˆ๋‹ค. ์ด ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ํ•˜๋Š” ์ผ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. Observable์ด ์•„๋‹Œ ๋ฐ˜์‘ํ˜• ํƒ€์ž…์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๋ช‡๋ช‡ ์—ฐ์‚ฐ์ž๋“ค์€ ๋‚ด๋ถ€์— Observable์„ ๋ฐ˜ํ™˜ํ•˜๋Š” ๊ตฌํ˜„์„ ๊ฐ€์ง€๊ณ  ์žˆ๊ณ  ์ด ๊ตฌํ˜„์€ toObservable ํ˜ธ์ถœ์—์„œ ์ถ”์ถœํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

count ์˜ˆ์ œ๋ฅผ ์‚ดํŽด๋ด…์‹œ๋‹ค.

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public interface Single<Long> count() {
    return RxJavaPlugins.onAssembly(new ObservableCountSingle<T>(this));
}

๊ธฐ๋ณธ์ ์œผ๋กœ ObservableCountSingle์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์ด ์—ฐ์‚ฐ์ž๋ฅผ ์กฐ๊ธˆ ๋” ๊นŠ๊ฒŒ ์‚ดํŽด๋ณด๋ฉด ์ด ๊ตฌํ˜„์ด FuseToObservable ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๊ณ  ๊ฒฐํ•ฉ ๋ชจ๋“œ๋กœ ๋ถˆ๋ฆด ๋•Œ ๋‹ค๋ฅธ ๊ตฌํ˜„์„ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ์„ ์•Œ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

public interface class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
    ...

    @Override
    public Observable<Long> fuseToObservable() {
        return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
    }
}

๊ทธ๋ฆฌ๊ณ  toObservable์„ ํ˜ธ์ถœํ•˜๋ฉด ๊ทธ ๊ตฌํ˜„์ด ์ถ”์ถœ๋˜๊ณ  toObservable์„ ์œ„ํ•œ Observable์ด ์ƒ์„ฑ๋˜์ง€ ์•Š์•„ ํšจ์œจ์ ์ž…๋‹ˆ๋‹ค.

public final Observable<T> toObservable() {
    if (this instanceOf FuseToObservable) {
        return ((FuseToObservable<T>)this).fuseToObservable();
    }
    return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
}

๊ตฌ๋… ์‹œ ๋งคํฌ๋กœ ๊ฒฐํ•ฉ

๊ตฌ๋… ์‹œ ๋งคํฌ๋กœ ๊ฒฐํ•ฉ์€ ์กฐ๋ฆฝ์— ์ด๋ฃจ์–ด์ง€๋Š” ๊ฒƒ๊ณผ ๋™์ผํ•œ ์ตœ์ ํ™”์— ์ค‘์ ์„ ๋‘์ง€๋งŒ subscribeActual ๋ฉ”์„œ๋“œ ๋‚ด์—์„œ ์ˆ˜ํ–‰๋ฉ๋‹ˆ๋‹ค.

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

๊ตฌ๋… ์ „์— ๋ฐ์ดํ„ฐ๊ฐ€ ๋ถˆํ™œ์‹คํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์กฐ๋ฆฝ ์‹œ์ ์— ์ตœ์ ํ™”๊ฐ€ ๋ถˆ๊ฐ€๋Šฅํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์กฐ๋ฆฝ ๋ณด๋‹ค ๊ตฌ๋… ๋•Œ ์ตœ์ ํ™”๋ฅผ ํ•˜๋Š” ๊ฒƒ์ด ๋” ํŽธ๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ตฌ๋… ๊ฒฐํ•ฉ์˜ ๊ธฐ๋ณธ

์กฐ๋ฆฝ๋•Œ์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ์–ด๋–ค ํŠน์ˆ˜ํ•œ ์กฐ๊ฑด์„ ํ™•์ธํ•˜์—ฌ ์ผ๋ฐ˜์ ์ด์ง€ ์•Š๋Š” ๋‹จ์ˆœํ•œ ๊ตฌํ˜„์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฐ„๋‹จํ•œ ์ตœ์ ํ™”๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ๋“ค์–ด Observable.amb๋Š” ์ œ๊ณต๋œ ์†Œ์Šค์˜ ๊ฐฏ์ˆ˜๋ฅผ ํ™•์ธํ•˜์—ฌ ๋ฌด๊ฑฐ์šด AmbCoordinator๋ฅผ ์ธ์Šคํ„ด์Šคํ™”ํ• ์ง€ ๋ง์ง€ ๊ฒฐ์ •ํ•ฉ๋‹ˆ๋‹ค.

public void subscribeActual(Observer<? super T> observer) {
    ObservableSource<? extends T>[] sources = this.sources;
    int count = 0;

    ...

    if (count == 0) {
        EmptyDisposable.complete(observer);
        return;
    } else if (count == 1) {
        sources[0].subscribe(observer);
        return;
    }

    AmbCoordinator<T> ac = new AmbCoordinator<T>(observer, count);
    ac.subscribe(sources);
}

Callable

์กฐ๋ฆฝ๋™์•ˆ ScalarCallable ์ธํ„ฐํŽ˜์ด์Šค๋กœ ๋ช‡๊ฐ€์ง€ ์ตœ์ ํ™”๋ฅผ ํ•ฉ๋‹ˆ๋‹ค. ๊ตฌ๋…ํ•  ๋•Œ๋„ Callable ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ด์šฉํ•œ ๋น„์Šทํ•œ ์ตœ์ ํ™”๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

์ฃผ์˜: ScalarCallable์ด Callable์„ ํ™•์žฅํ•œ ๊ฒƒ ์ฒ˜๋Ÿผ - ScalarCallable๋กœ ์กฐ๋ฆฝ ๋•Œ ์ ์šฉํ•œ ์ตœ์ ํ™”๋“ค์€ ๊ตฌ๋… ๋•Œ Callable์— ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

Callable ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํ‘œ์‹œ๋œ Observable์— ๋Œ€ํ•œ ๊ตฌ๋…ํ•  ๋•Œ xMap ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•  ๋•Œ๋Š” ๋‹จ์ˆœํ™”๋œ ๊ตฌํ˜„์œผ๋กœ ๋Œ€์ฒดํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

๋งˆ์ดํฌ๋กœ ๊ฒฐํ•ฉ

๋งˆ์ดํฌ๋กœ ๊ฒฐํ•ฉ์€ ์–ด๋–ค ๋™๊ธฐํ™”์™€ ํ์™€ ๊ฐ™์€ ๊ณต์œ  ๋‚ด๋ถ€ ๊ตฌ์กฐ๋ฅผ ์ค„์—ฌ ๊ฐ„์ ‘๋น„์šฉ์„ ์ตœ์†Œํ™”ํ•˜๋Š” ๊ฒƒ์„ ๋ชฉํ‘œ๋กœ ํ•œ๋‹ค.

ConditionalSubscriber

Flowable.filter์™€ ๊ฐ™์€ ์—ฐ์‚ฐ์ž๊ฐ€ ์‚ฌ์šฉ๋  ๋•Œ ์–ด๋–ค ์ผ์ด ์ผ์–ด๋‚˜๋Š”์ง€๋ฅผ ์‚ดํŽด๋ณด์ž.

์—…์ŠคํŠธ๋ฆผ, filter ์—ฐ์‚ฐ์ž, ๋‹ค์šด์ŠคํŠธ๋ฆผ์„ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐ’์ด 5๋ณด๋‹ค ์ž‘์€์ง€ ๊ฒ€์‚ฌํ•˜๋Š” ํ•„ํ„ฐ๊ฐ€ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ์‹œ๋‹ค. ๊ตฌ๋…์ด ๋˜๋ฉด ๋‹ค์šด์ŠคํŠธ๋ฆผ์€ ์—…์ŠคํŠธ๋ฆผ์—๊ฒŒ ๋ช‡๊ฐœ์˜ ํ•ญ๋ชฉ์„ ์š”์ฒญํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

  • ํ•„ํ„ฐ์—๊ฒŒ ๋‹ค์šด์ŠคํŠธ๋ฆฝ์€ 1๊ฐœ์˜ ํ•ญ๋ชฉ์„ ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.
  • ํ•„ํ„ฐ๋Š” ์—…์ŠคํŠธ๋ฆผ์—๊ฒŒ 1๊ฐœ์˜ ํ•ญ๋ชฉ์„ ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.
  • ์—…์ŠคํŠธ๋ฆผ์€ ํ•ญ๋ชฉ์„ ์ƒ์„ฑํ•˜๊ณ  ํ•„ํ„ฐ๋กœ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค. (์ˆซ์ž๊ฐ€ 1์ด๋ผ๊ณ  ํ•ฉ์‹œ๋‹ค.)
  • ํ•„ํ„ฐ๋Š” ๊ฐ’์ด ์ˆ ์–ด(predicate)๋ฅผ ๋งŒ์กฑํ•˜๋Š”์ง€ ํ™•์ธํ•˜์—ฌ ๋‹ค์šด์ŠคํŠธ๋ฆผ์— ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.
  • ๋‹ค์šด์ŠคํŠธ๋ฆผ์€ ํ•ญ๋ชฉ์„ ๋ฐ›์•„๋“ค์ด๊ณ  ํ•„ํ„ฐ์—๊ฒŒ ํ•˜๋‚˜ ๋” ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.
  • ํ•„ํ„ฐ๋Š” ์—…์ŠคํŠธ๋ฆผ์—๊ฒŒ 1๊ฐœ์˜ ํ•ญ๋ชฉ์„ ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.
  • ์—…์ŠคํŠธ๋ฆผ์€ ํ•ญ๋ชฉ์„ ๋งŒ๋“ค๊ณ  ํ•„ํ„ฐ์—๊ฒŒ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค. (์ˆซ์ž๊ฐ€ 10์ด๋ผ๊ณ  ํ•ฉ์‹œ๋‹ค.)
  • ํ•„ํ„ฐ๋Š” ๊ฐ’์ด ์ˆ ์–ด๋ฅผ ๋งŒ์กฑํ•˜์ง€ ๋ชปํ•œ ๊ฒƒ์„ ํ™•์ธํ•˜๊ณ  ๋‹ค์šด์ŠคํŠธ๋ฆผ์—๊ฒŒ๋Š” ์ „๋‹ฌํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋‹ค์šด์ŠคํŠธ๋ฆผ์ด ํ•˜๋‚˜์˜ ํ•ญ๋ชฉ์„ ์š”์ฒญํ•˜์˜€์ง€๋งŒ ํ•„ํ„ฐ๊ฐ€ ์ œ๊ณตํ•˜์ง€ ์•Š์•˜๊ณ  ๊ทธ๋ž˜์„œ ํ•„ํ„ฐ๋Š” ์—…์ŠคํŠธ๋ฆผ์—๊ฒŒ ํ•˜๋‚˜ ๋” ์š”์ฒญํ•ฉ๋‹ˆ๋‹ค.
  • ์ŠคํŠธ๋ฆผ์ด ์ข…๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ์ด ์ž‘์—…์€ ๋ฐ˜๋ณต๋ฉ๋‹ˆ๋‹ค.

์—ฐ์‚ฐ์ž ๊ฐ„ ๋งŽ์€ ํ†ต์‹ ์ด ์žˆ๋˜ ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ๊ณ  ๋” ์ค‘์š”ํ•œ ๊ฒƒ์€ ๊ฐ ์ด๋ฒคํŠธ๊ฐ€ ์ง๋ ฌํ™” ๋œ ๋ฐฉ์‹์œผ๋กœ ์ œ๊ณต๋˜๊ธฐ ๋•Œ๋ฌธ์— ์•ฝ๊ฐ„์˜ ๊ฐ„์ ‘๋น„์šฉ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

๊ทธ ์‚ฌ์ด์— ๋‘๊ฐœ์˜ ์—ฐ์‚ฐ์ž๊ฐ€ ์žˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•ฉ์‹œ๋‹ค. ํ†ต์‹ ์€ ์ƒ๋‹นํ•œ ๊ฐ„์ ‘๋น„์šฉ์„ ์ฆ๊ฐ€์‹œํ‚ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ConditionalSubscriber๋Š” ์ด ์ƒํ™ฉ์„ ๊ตฌํ•ฉ๋‹ˆ๋‹ค.

public interface ConditionalSubscriber<T> extends FlowableSubscriber<T> {
    boolean tryOnNext(T t);
}

์ผ๋ฐ˜์ ์œผ๋กœ Subscriber์˜ onNext ์ฝœ๋ฐฑ์€ ์•„๋ฌด ๊ฐ’๋„ ๋ฐ˜ํ™˜ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์—…์ŠคํŠธ๋ฆผ์€ ๋‹จ์ˆœํžˆ ๊ฐ’์„ ์ฝœ๋ฐฑ์„ ํ†ตํ•ด ์ œ๊ณตํ•˜๊ณ  ๋‹ค์šด์ŠคํŠธ๋ฆผ์˜ ์ƒˆ๋กœ์šด ์š”์ฒญ์„ ๊ธฐ๋‹ค๋ฆฝ๋‹ˆ๋‹ค. ConditionalSubscriber๋Š” ์ถ”๊ฐ€์ ์ธ tryOnNext ๋ฉ”์„œ๋“œ๋ฅผ ๊ฐ€์ง‘๋‹ˆ๋‹ค. ์ด๋Š” onNext์™€ ๋น„์Šทํ•œ๋ฐ ๊ฐ’์„ ๋ฐ›์„ ์ˆ˜ ์žˆ๋Š๋ƒ ๊ฑฐ์ ˆ๋˜์—ˆ๋ƒ์— ๋”ฐ๋ผ ๋ถˆ๋ฆฌ์–ธ ๊ฐ’์„ ์ฆ‰ํžˆ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Š” ์—…์ŠคํŠธ๋ฆผ์ด ์ง์ ‘ ์‘๋‹ต์„ ์ˆ˜์‹ ํ•  ๋•Œ request(n) ํ˜ธ์ถœ์ด ์š”์ฒญ๋˜๋Š” ํšŒ์ˆ˜๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

`Flowable.filter' ๊ตฌํ˜„์˜ ์˜ˆ๋ฅผ ์‚ดํŽด๋ณด๋ฉด ์—…์ŠคํŠธ๋ฆผ filter๊ฐ€ ์ง์ ‘ ๋‹ค์šด์ŠคํŠธ๋ฆผ filter์˜ ์ˆ ์–ด๋ฅผ ์ ‘๊ทผํ•˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@Override
public boolean tryOnNext(T t) {
    ...

    boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    return b && downstream.tryOnNext(t);
}

์ด๋Š” ๋ช‡ ์ฐจ๋ก€์˜ ์š”์ฒญ์„ ์ ˆ์•ฝํ•ฉ๋‹ˆ๋‹ค.

์ตœ์ ํ™”๊ฐ€ ์—ฐ์‡„ ํ•„ํ„ฐ ์—ฐ์‚ฐ์ž์˜ ๊ฐ„์ ‘ ๋น„์šฉ์„ ์ค„์ด๊ณ ์ž ํ•˜๋Š”๊ฒŒ ๋ชฉํ‘œ๋ผ๋ฉด ๊ทธ๋‹ค์ง€ ๋†€๋ž์ง€๋Š” ์•Š์„ ๊ฒƒ์ž…๋‹ˆ๋‹ค. (์–ด์จŒ๋“  ํ•˜๋‚˜์˜ ํ•„ํ„ฐ ์—ฐ์‚ฐ์ž๋กœ ์“ธ ์ˆ˜ ์žˆ๋Š” ๊ฒƒ ์ฒ˜๋Ÿผ ๋ณด์ด๋‹ˆ๊น์š”.) ์ข‹์€ ์ ์€ Flowable.map ์—ญ์‹œ ConditionalSubscriber๋กœ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๊ณ  ์—ฌ๋Ÿฌ ํ•„ํ„ฐ์™€ ๋งต ์—ฐ์‚ฐ์ž๊ฐ€ ์—ฐ์‡„์ ์œผ๋กœ ํ•จ๊ป˜ ์žˆ๋‹ค๋ฉด ๊ฐ„์ ‘ ๋น„์šฉ์„ ๋” ์ค„์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@Override
public boolean tryOnNext(T t) {
    ...

    U v;

    tru {
        v = ObjectHelper.requireNonNull(mapper.apply(t), ...);
    } catch (Throwable ex) {
        fail(ex);
    }
}

Queue fuseable

๊ฐ€์žฅ ๋ณต์žกํ•œ ๋งˆ์ดํฌ๋กœ ๊ฒฐํ•ฉ์€ ์—ฐ์‚ฐ์ž ์‚ฌ์ด์˜ ๊ณต์œ  ๋‚ด๋ถ€ ํ๋“ค์— ๊ธฐ๋ฐ˜ํ•ฉ๋‹ˆ๋‹ค. ์ „์ฒด ์ตœ์ ํ™”๋Š” QueueSubscription ์ธํ„ฐํŽ˜์ด์Šค์— ๊ธฐ๋ฐ˜ํ•ฉ๋‹ˆ๋‹ค.

public interface QueueSubscription<T> extends QueueFuseable<T>, Subscription {
}

๊ธฐ๋ณธ์ ์œผ๋กœ Queue์™€ Subscription์ด ํ•œ ์ธํ„ฐํŽ˜์ด์Šค ์•„๋ž˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ํ ์ธํ„ฐํŽ˜์ด์Šค๋Š” ์ž๋ฐ”์˜ ๊ฐ„๋‹จํ•œ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ์•„๋‹™๋‹ˆ๋‹ค. ๋Œ€์‹  ์ถ”๊ฐ€์ ์ธ ๋ฉ”์„œ๋“œ requestFusion์„ ๊ฐ€์ง‘๋‹ˆ๋‹ค.

public interface QueueFuseable<T> extends SimpleQueue<T> {

    int requestFusion(int mode);

    int NONE = 0;
    int SYNC = 1;
    int ASYNC = 2;
    int ANY = SYNC | ASYNC;
    int BOUNDARY = 4;
}

Flowable๊ณผ Subscriber ์‚ฌ์ด์— onXXX ์ฝœ๋ฐฑ์„ ์‚ฌ์šฉํ•˜๋Š” ์ผ๋ฐ˜์ ์ธ ์ปค๋ฎค๋‹ˆ์ผ€์ด์…˜๊ณผ ๋น„๊ตํ•ด์„œ, ์—…์ŠคํŠธ๋ฆผ์€ Subscription ๋ฟ๋งŒ ์•„๋‹ˆ๋ผ QueueSubscription๋ฅผ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. QueueSubscription์€ ์ง์ ‘์ ์œผ๋กœ ๋‚ด๋ถ€์˜ ํ์— ์ ‘๊ทผํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋™์ž‘๋ฐฉ์‹์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. ๋จผ์ € onSubscribe ๋™์•ˆ ์—…์ŠคํŠธ๋ฆผ๊ณผ ๋‹ค์šด์ŠคํŠธ๋ฆผ์€ ๊ฒฐํ•ฉ์— ํ•ฉ์˜ํ•˜๊ณ  ์ž‘๋™ํ•  ๊ฒฐํ•ฉ ๋ชจ๋“œ(fusion mode)๋ฅผ ์„ ํƒํ•ฉ๋‹ˆ๋‹ค. ๊ฒฐํ•ฉ์ด ํ•ฉ์˜๋˜๋ฉด ์ƒˆ๋กœ์šด ํ†ต์‹  ๊ตฌํ˜„์ด ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š๋‹ค๋ฉด onXXX ์ฝœ๋ฐฑ์„ ์‚ฌ์šฉํ•˜๋Š” ์ „ํ†ต์ ์ธ ํ†ต์‹ ์ด ์ˆ˜๋ฆฝ๋ฉ๋‹ˆ๋‹ค.

์ผ๋ฐ˜์ ์œผ๋กœ ๊ฒฐํ•ฉ์ด ์ˆ˜๋ฆฝ๋˜๋ฉด ๋‹ค์šด์ŠคํŠธ๋ฆผ์€ ์—…์ŠคํŠธ๋ฆผ ํ์˜ poll() ๋ฉ”์„œ๋“œ๋ฅผ ์ง์ ‘ ํ˜ธ์ถœํ•˜์—ฌ ํ•ญ๋ชฉ์„ ์–ป์„ ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์„ธ๊ฐ€์ง€ ๊ฒฐํ•ฉ ๋ชจ๋“œ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.

  • NONE - ๊ฒฐํ•ฉ์„ ํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.
  • SYNC - ๋™๊ธฐ์ ์ธ ๊ฒฐํ•ฉ์ด ์ด๋ฃจ์–ด์ง‘๋‹ˆ๋‹ค.
  • ASYNC - ๋น„๋™๊ธฐ์ ์ธ ๊ฒฐํ•ฉ์ด ์ด๋ฃจ์–ด์ง‘๋‹ˆ๋‹ค.

ANY๋Š” ๋‹จ์ˆœํžˆ SYNC์ด๊ฑฐ๋‚˜ ASYNC์ž…๋‹ˆ๋‹ค. (์ด๋Š” ์—…์ŠคํŠธ๋ฆผ์ด ๋ฌด์—‡์„ ์ง€์›ํ•˜๋ƒ์— ๋”ฐ๋ผ ์ˆ˜๋ฆฝ์ด ๊ฒฐ์ •๋ฉ๋‹ˆ๋‹ค.)

SYNC ๊ฒฐํ•ฉ

์—…์ŠคํŠธ๋ฆผ์˜ ๊ฐ’์ด ์ด๋ฏธ ์ •์ ์ด๊ฑฐ๋‚˜ poll()์ด ํ˜ธ์ถœ๋  ๋•Œ ๋™๊ธฐ์ ์œผ๋กœ ์ƒ์„ฑ๋œ๋‹ค๋ฉด SYNC ๊ฒฐํ•ฉ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋งŒ์•ฝ ์—…์ŠคํŠธ๋ฆผ๊ณผ ๋‹ค์šด์ŠคํŠธ๋ฆผ์ด ๋™๊ธฐ ๊ฒฐํ•ฉ ๋ชจ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ๋กœ ํ•ฉ์˜๋˜๋ฉด ์•„๋ก€์˜ ๊ณ„์•ฝ์„ ๋”ฐ๋ฆ…๋‹ˆ๋‹ค.

  • ๋‹ค์šด์ŠคํŠธ๋ฆผ์€ ํ•„์š”์— ๋”ฐ๋ผ ์ง์ ‘ poll() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.
  • poll() ๋ฉ”์„œ๋“œ๋Š” ์˜ˆ์™ธ๋ฅผ ๋˜์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” onError์™€ ๋™์ผํ•ฉ๋‹ˆ๋‹ค.
  • poll() ๋ฉ”์„œ๋“œ๋Š” null์„ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” onComplete์™€ ๋™์ผํ•ฉ๋‹ˆ๋‹ค.
  • poll() ๋ฉ”์„œ๋“œ๋Š” null์ด ์•„๋‹Œ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” onNext์™€ ๋™์ผํ•ฉ๋‹ˆ๋‹ค.
  • ์—…์ŠคํŠธ๋ฆผ์€ ์–ด๋–ค onXXX ์ฝœ๋ฐฑ๋„ ํ˜ธ์ถœํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

SYNC ๊ฒฐํ•ฉ ๋ชจ๋“œ๋ฅผ ์ง€์›ํ•˜๋Š” ์—ฐ์‚ฐ์ž์˜ ์˜ˆ๋Š” Flowable.range์ž…๋‹ˆ๋‹ค.

@Override
public final int requestFusion(int mode) {
    return mode & SYNC;
}

@Nullable
@Override
public final Integer poll() {
    int i = index;
    if (i == end) {
        return null;
    }
    index = i + 1;
    return i;
}

ASYNC ๊ฒฐํ•ฉ

poll()์„ ํ˜ธ์ถœํ•  ๋•Œ ๋‚˜์ค‘์—(eventually) ์—…์ŠคํŠธ๋ฆผ์˜ ๊ฐ’์ด ์‚ฌ์šฉ๊ฐ€๋Šฅํ•ด์งˆ๋•Œ ASYNC ๊ฒฐํ•ฉ ๋ชจ๋“œ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์—…์ŠคํŠธ๋ฆผ๊ณผ ๋‹ค์šด์ŠคํŠธ๋ฆผ์ด ๋น„๋™๊ธฐ ๊ฒฐํ•ฉ ๋ชจ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ๋กœ ํ•ฉ์˜๋˜๋ฉด ์•„๋ž˜์˜ ๊ณ„์•ฝ์„ ๋”ฐ๋ฆ…๋‹ˆ๋‹ค.

  • ์—…์ŠคํŠธ๋ฆผ์€ ์ผ๋ฐ˜์ ์œผ๋กœ onError์™€ onComplete๋ฅผ ๋ณด๋ƒ…๋‹ˆ๋‹ค.
  • onNext์€ ์—…์ŠคํŠธ๋ฆผ์˜ ๊ฐ’ ๋ฟ ์•„๋‹ˆ๋ผ null์„ ๋Œ€์‹  ๊ฐ€์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‹ค์šด์ŠคํŠธ๋ฆผ์€ ์ด onNext๋ฅผ poll()์„ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋Š” ํ‘œ์‹์œผ๋กœ ์ทจ๊ธ‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • poll()์˜ ํ˜ธ์ถœ์ž๋Š” ์˜ˆ์™ธ๋ฅผ ์ฒ˜๋ฆฌํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๋งž์Šต๋‹ˆ๋‹ค. RxJava์—์„œ onNext๋Š” null๊ฐ’์„ ๊ฐ€์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ASYNC ๊ฒฐํ•ฉ ๋ชจ๋“œ๋ฅผ ์ง€์›ํ•˜๋Š” ์—ฐ์‚ฐ์ž์˜ ์˜ˆ๋Š” Flowable.filter์ž…๋‹ˆ๋‹ค.

public T poll() throws Exception {
    QueueSubscription<T> qs = this.qs;
    Predicate<? super T> f = filter;

    for (;;) {
        T t = qs.poll();
        if (t == null) {
            return null;
        }

        if (f.test(t)) {
            return t;
        }

        if (sourceMode == ASYNC) {
            qs.request(1);
        }
    }
}
@Override
public boolean tryOnNext(T t) {
    if (done) {
        return false;
    }

    if (sourceMode != NONE) {
        return downstream.tryOnNext(null);
    }

    boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    return b && downstream.tryOnNext(t);
}

๊ฒฐํ•ฉ ๋ชจ๋“œ๋ฅผ ์ง€์›ํ•˜๋Š” ๋ช‡๊ฐœ์˜ ์—ฐ์‚ฐ์ž๋ฅผ ์‚ดํˆ์ง€๋งŒ ์ด ๋ชจ๋“œ๋ฅผ ํ™œ์„ฑํ™”ํ•˜๊ธฐ ์œ„ํ•ด ๋จผ์ € ๊ฒฐํ•ฉ์„ ์š”์ฒญํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. Flowable.flatMap ๋‚ด์—์„œ ๊ฒฐํ•ฉ ๋ชจ๋“œ๊ฐ€ ์š”์ฒญํ•˜๋Š” ์˜ˆ์ž…๋‹ˆ๋‹ค.

@Override
public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.setOnce(this, s)) {
        if (s instanceOf QueueSubscription) {
            QueueSubscription<U> qs = (QueueSubscription<U>) s;
            int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);
            if (m == QueueSubscription.SYNC) {
                fusionMode = m;
                queue = qs;
                done = true;
                paent.drain();
            }
            if (m == QueueSubscription.ASYNC) {
                fusionMode = m;
                queue = qs;
            }
        }
        s.request(bufferSize);
    }
}

์†Œ์Šค๊ฐ€ QueueSubscription์„ ๊ตฌํ˜„ํ•œ๋‹ค๋ฉด ๊ตฌ๋… ๋™์•ˆ ๊ฒฐํ•ฉ ๋ชจ๋“œ ANY๊ฐ€ ์š”์ฒญ๋˜๋Š” ๊ฒƒ์„ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์†Œ์Šค์— ์˜ํ•ด ์ˆ˜๋ฝ๋œ ๋ชจ๋“œ์— ๋”ฐ๋ผ ๋‹ค๋ฅธ ์ „๋žต์ด ์ ์šฉ๋ฉ๋‹ˆ๋‹ค.

QueueSubscription ์Šค๋ ˆ๋“œ

map ์•ˆ์—์„œ ๋ฌด๊ฑฐ์šด ์—ฐ์‚ฐ์ด ์ด๋ฃจ์–ด์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ (์ง์ ‘ ํด๋ง์ผ ๊ฒฝ์šฐ) ํ•ด๋‹น ๊ณ„์‚ฐ์ด ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋กœ ์œ ์ถœ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด ์ถ”๊ฐ€์ ์ธ ๋งˆ์ปค ์˜ต์…˜ BOUNDARY๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Š” poll ๋ฉ”์„œ๋“œ์˜ ํ˜ธ์ถœ์ž๊ฐ€ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์ผ ์ˆ˜ ์žˆ์Œ์„ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค. ์—ฐ์‚ฐ์ž๋Š” BOUNDARY ์˜ต์…˜์„ ๋ฌด์‹œํ•  ์ˆ˜ ์žˆ๊ณ , ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ์—์„œ ํ์˜ ์ ‘๊ทผ์„ ํ—ˆ์šฉํ•  ์ˆ˜๋„, BOUNDARY ์˜ต์…˜์ด ์š”์ฒญ๋  ๊ฒฝ์šฐ ๊ฒฐํ•ฉ์„ ๊ฑฐ๋ถ€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@Override
public int requestFusion(int mode) {
    return transitiveBoundaryFusion(mode);
}

(transitiveBoundaryFusion) ๋‚ด๋ถ€๋Š” BOUNDARY ๋ชจ๋“œ๋ฅผ ํ—ˆ์šฉํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

protected final int transitiveBoundaryFusion(int mode) {
    QueueSubscription<T> qs = this.qs;
    if (qs != null) {
        if ((mode & BOUNDARY) == 0) {
            int m = qs.requestFusion(mode);
            if (m != NONE) {
                sourceMode = m;
            }
            return m;
        }
    }
    return NONE;
}

๊ฒฐ๋ก 

์ด ๊ธ€์—์„œ RxJava์˜ ๋ช‡๊ฐ€์ง€ ์ตœ์ ํ™”์˜ ๊ฐœ์š”๋ฅผ ์‚ดํŽด๋ณด๊ณ  ๋ช‡๊ฐ€์ง€ ํฅ๋ฏธ๋กœ์šด ๊ฒƒ์„ ๋ดค์Šต๋‹ˆ๋‹ค.

  • RxJava 2์˜ Observable์€ ๋ฐฐ์••์„ ์ง€์›ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. (๋” ์ด์ƒ์˜ ํ•ญ๋ชฉ์ด ์—†๋‹ค๋Š” ๊ฑธ ์—…์ŠคํŠธ๋ฆผ์— ์•Œ๋ฆด ๋ฐฉ๋ฒ•์ด ์—†์Šต๋‹ˆ๋‹ค.)
  • ๋ช‡๊ฐ€์ง€ ๋‚ด๋ถ€ ์ตœ์ ํ™”๊ฐ€ null ๊ฐ’์„ ์ฝœ๋ฐฑ์— ์ „ํ•˜๋Š” ๊ฒƒ์— ๊ธฐ๋ฐ˜ํ•˜๊ธฐ ๋•Œ๋ฌธ์— RxJava์—์„œ null ๊ฐ’์„ ์ „๋‹ฌํ•˜๋Š” ๊ฒƒ์€ ์•ˆ๋ฉ๋‹ˆ๋‹ค.
  • ์ตœ์ ํ™”๋ฅผ ๋ชจ๋‘ ๋„๊ณ  ์‹ถ๋‹ค๋ฉด hide() ์—ฐ์‚ฐ์ž๋Š” ๋งค์šฐ ์ค‘์š”ํ•ฉ๋‹ˆ๋‹ค.
  • ์—ฐ์‚ฐ์ž ๊ฒฐํ•ฉ์€ ํ™”๋ คํ•˜์ง€๋งŒ ์—ฌ๋Ÿฌ ์ดˆ์ ํ™” ์ค‘์˜ ํ•˜๋‚˜์— ๋ถˆ๊ณผํ•ฉ๋‹ˆ๋‹ค. ๋ชจ๋“  ์—ฐ์‚ฐ์ž์—์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์€ ์•„๋‹™๋‹ˆ๋‹ค. ์ตœ์ ํ™” ๋  ๊ฒƒ ๊ฐ™์•„ ๋ณด์ด์ง€๋งŒ ๊ฒฝ์šฐ์— ๋”ฐ๋ผ ์–ด๋–ค ์ตœ์ ํ™”๋„ ์ž‘๋™ํ•˜์ง€ ์•Š์•„ ๋†€๋ž„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ ์ด์œ ๋Š” ์ด๋Ÿฐํ•œ ์ตœ์ ํ™”๊ฐ€ ์ผ๋ถ€ ์ค‘์š”ํ•œ ๋ถ€๋ถ„๊ณผ ๊ณตํ†ต ๋ฌธ์ œ์— ์ ์šฉ๋˜๊ณ  ์ผ๋ฐ˜์ ์ธ ์ตœ์ ํ™”๋Š” ๋งค์šฐ ์–ด๋ ต๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค.

๊ทธ๋ž˜์„œ RxJava ๋‚ด๋ถ€์˜ ๋ชจ๋“  ๊ฒƒ์ด ํšจ์œจ์ ์ด๋‹ค ์ƒ๊ฐํ•˜๊ณ  ๊ธด Rx ์ฒด์ธ์„ ๋งŒ๋“ค์ง€ ๋งˆ์‹ญ์‹œ์š”. ๋‹น์‹ ์˜ ์ฝ”๋“œ๋ฅผ ๋ฒค์น˜๋งˆํฌ ํ•˜๊ณ  ์ค‘์š”ํ•œ ์ฒด์ธ์„ ํ”„๋กœํŒŒ์ผ๋งํ•ด์„œ ๊ฐœ๋ณ„์ ์œผ๋กœ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์„ ์ฐพ์œผ์„ธ์š”.

์•„๋ž˜์˜ ๊ธ€์„ ๊ฐ™์ด ์ฝ์œผ์„ธ์š”.

ํ–‰๋ณตํ•œ ์ฝ”๋”ฉํ•˜์„ธ์š”.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment