Skip to content

Instantly share code, notes, and snippets.

@ShinichiroFunatsu
Last active January 10, 2020 02:19
Show Gist options
  • Save ShinichiroFunatsu/ab0dfaaef0619147f1b281dccdaad88f to your computer and use it in GitHub Desktop.
Save ShinichiroFunatsu/ab0dfaaef0619147f1b281dccdaad88f to your computer and use it in GitHub Desktop.
RxJava Composition Sample
private fun println2(str: Any) {
val s = "[${Thread.currentThread().name}] $str"
println(s)
}
fun rxJavaSample1() {
println2("step 0")
val sInt = Single.create<Int> { emitter -> println2("step 2"); emitter.onSuccess(1) }
.doOnSuccess { println2("success") }
val cPrint: (Int) -> Completable = { i -> Completable.create { emitter -> println2("step 3"); emitter.onComplete() }}
sInt // ↓sub computation
.subscribeOn(Schedulers.computation())
.doOnSubscribe{ println2("step 1-3") } // ↓sub io
.observeOn(Schedulers.computation())
.flatMapCompletable(cPrint) // ↑obs computation
.subscribeOn(Schedulers.io()) // io
.doOnSubscribe{ println2("step 1-2") } // ↓sub single
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.single()) // single
.doOnSubscribe{ println2("step 1-1") } // main
.doOnComplete { println2("step 4") }
.subscribe { println2("complete") }
// [main] step 0
// [main] step 1-1
// [RxSingleScheduler-1] step 1-2
// [RxCachedThreadScheduler-1] step 1-3
// [RxComputationThreadPool-1] step 2
// [RxComputationThreadPool-1] success
// [RxComputationThreadPool-2] step 3
// [RxCachedThreadScheduler-1] step 4
// [RxCachedThreadScheduler-1] complete
}
fun rxJavaSample2() {
println2("step 0")
val sInt = Single.create<Int> { emitter -> println2("step 2"); emitter.onSuccess(1) }
.doOnSuccess { println2("success $it") }
.subscribeOn(Schedulers.single())
// .doOnSubscribe{ println2("step 1") }
sInt
.flatMapCompletable{
Completable.create { emitter -> println2("step 3: $it"); emitter.onComplete() }
.subscribeOn(Schedulers.computation())
}
.observeOn(Schedulers.io())
.doOnSubscribe{ println2("step 1") }
.doOnComplete { println2("step 4") }
.subscribe { println2("complete") }
}
fun rxJavaSample3() {
val numStream = Flowable.range(0,5)
val odd = numStream.filter { it % 2 != 0}
val even = numStream.filter { it % 2 == 0}
val printNum = { i: Int -> Completable.create { println(i); it.onComplete() }}
val oddPrint = odd.flatMapCompletable(printNum)
val evenPrint = even.flatMapCompletable(printNum)
oddPrint.andThen(Completable.defer{ evenPrint } )
.subscribe()
}
@ShinichiroFunatsu
Copy link
Author

ShinichiroFunatsu commented Jan 9, 2020

rxJavaSample1

[main] step 0
[main] step 1-1
[RxSingleScheduler-1] step 1-2
[RxCachedThreadScheduler-1] step 1-3
[RxComputationThreadPool-1] step 2
[RxComputationThreadPool-1] success
[RxComputationThreadPool-2] step 3
[RxCachedThreadScheduler-1] step 4
[RxCachedThreadScheduler-1] complete

rxJavaSample2

[main] step 0
[main] step 1
[RxSingleScheduler-1] step 2
[RxSingleScheduler-1] success 1
[RxComputationThreadPool-1] step 3: 1
[RxCachedThreadScheduler-1] step 4
[RxCachedThreadScheduler-1] complete

rxJavaSample3

1
3
0
2
4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment