Last active
July 9, 2024 22:35
-
-
Save zawadz88/ec55d275f2dd878f595ea1bfe27b59af to your computer and use it in GitHub Desktop.
Tests for disposing Disposables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import io.reactivex.Completable | |
import io.reactivex.observers.TestObserver | |
import io.reactivex.schedulers.Schedulers | |
import org.junit.Assert.assertEquals | |
import org.junit.Rule | |
import org.junit.Test | |
import java.util.concurrent.Executors | |
class CompletableOperatorsTest { | |
@get:Rule | |
val rule = TestSchedulerRule() | |
private var counter: Int = 0 | |
@Test | |
fun `Emission cancelled right after fromCallable()`() { | |
val testObserver = Completable.fromCallable { | |
println("fromCallable") | |
Thread.sleep(100L) | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(0) | |
/* Output: | |
fromCallable | |
disposing | |
doOnDispose: observable has been disposed | |
*/ | |
} | |
@Test | |
fun `Emission cancelled right after andThen()`() { | |
val testObserver = Completable.fromCallable { | |
println("fromCallable") | |
Thread.sleep(100L) | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete1") | |
} | |
.andThen { | |
counter++ | |
println("andThen") | |
Thread.sleep(100L) | |
Completable.complete() | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete2") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable | |
doOnComplete1 | |
andThen | |
disposing | |
doOnDispose: observable has been disposed | |
*/ | |
} | |
@Test | |
fun `Emission disposing in doOnComplete() should still get the value to subscriber if next operator is doOnComplete()`() { | |
val testObserver = Completable.complete() | |
.andThen( | |
Completable.fromCallable { | |
println("fromCallable") | |
Thread.sleep(100L) | |
} | |
) | |
.doOnComplete { | |
counter++ | |
println("doOnComplete1") | |
Thread.sleep(100L) | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete2") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable | |
doOnComplete1 | |
disposing | |
doOnDispose: observable has been disposed | |
doOnComplete2 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled after disposing in doOnComplete() after andThen()`() { | |
val testObserver = Completable.fromCallable { | |
println("fromCallable") | |
Thread.sleep(100L) | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
Thread.sleep(100L) | |
} | |
.andThen { | |
counter++ | |
println("andThen") | |
Completable.complete() | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete2") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable | |
doOnComplete | |
disposing | |
doOnDispose: observable has been disposed | |
andThen | |
E.g. up until RxJava 2.2.5 "andThen" would not be printed, since 2.2.6 it would (probably due to: https://github.com/ReactiveX/RxJava/pull/6362) | |
*/ | |
} | |
@Test | |
fun `Emission disposing in doOnComplete() should still get the value to subscriber if next operator is toSingle()`() { | |
val testObserver = Completable.fromCallable { | |
println("fromCallable") | |
Thread.sleep(100L) | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
Thread.sleep(100L) | |
} | |
.toSingle { | |
counter++ | |
println("toSingle") | |
1 | |
} | |
.doOnSuccess { | |
counter++ | |
println("doOnSuccess") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertValue(1) | |
.assertComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(3) | |
/* Output: | |
fromCallable | |
doOnComplete | |
disposing | |
doOnDispose: observable has been disposed | |
toSingle | |
doOnSuccess | |
*/ | |
} | |
private fun triggerActions() = rule.testScheduler.triggerActions() | |
private fun disposeObserverWithDelay(delay: Long, testObserver: TestObserver<*>) { | |
Executors.newSingleThreadExecutor().submit { | |
Thread.sleep(delay) | |
println("disposing") | |
testObserver.dispose() | |
} | |
} | |
private fun assertCounterValue(expectedCounter: Int) { | |
assertEquals("Invalid counter value", expectedCounter, counter) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import io.reactivex.Observable | |
import io.reactivex.Single | |
import io.reactivex.observers.TestObserver | |
import io.reactivex.schedulers.Schedulers | |
import org.junit.Assert.assertEquals | |
import org.junit.Rule | |
import org.junit.Test | |
import java.util.concurrent.Executors | |
class ObservableOperatorsTest { | |
@get:Rule | |
val rule = TestSchedulerRule() | |
private var counter: Int = 0 | |
@Test | |
fun `Uninterrupted emission`() { | |
val testObserver = Observable.just(1, 2, 3, 4, 5, 6) | |
.flatMap { | |
Observable.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it * it | |
} | |
.doOnNext { | |
counter++ | |
println("doOnNext -> value: $it") | |
} | |
.flatMap { | |
counter++ | |
println("flatMap -> value: $it") | |
Observable.just(it) | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertValues(1) | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(3) | |
/* Output: | |
fromCallable -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
map -> value: 1 | |
doOnNext -> value: 1 | |
flatMap -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled after switchMap()`() { | |
val testObserver = Observable.just(1, 2, 3, 4, 5, 6) | |
.flatMap { | |
Observable.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.switchMap { | |
counter++ | |
println("switchMap -> value: $it") | |
Observable.just(it) | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it * it | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(1) | |
/* Output: | |
fromCallable -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
switchMap -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled before concatMap()`() { | |
val testObserver = Observable.just(1, 2, 3, 4, 5, 6) | |
.flatMap { | |
Observable.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.concatMap { | |
counter++ | |
println("concatMap -> value: $it") | |
Observable.just(it) | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it * it | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(0) | |
/* Output: | |
fromCallable -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
*/ | |
} | |
@Test | |
fun `Emission cancelled after flatMap with Single#just() + toObservable()`() { | |
val testObserver = Observable.just(1, 2, 3, 4, 5, 6) | |
.flatMap { | |
Observable.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.flatMap { | |
counter++ | |
println("flatMap with toObservable() -> value: $it") | |
Single.just(it).toObservable() | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it * it | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(1) | |
/* Output: | |
fromCallable -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
flatMap with toObservable() -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled after flatMapSingle()`() { | |
val testObserver = Observable.just(1, 2, 3, 4, 5, 6) | |
.flatMap { | |
Observable.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.flatMapSingle { | |
counter++ | |
println("flatMapSingle -> value: $it") | |
Single.just(it) | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it * it | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(1) | |
/* Output: | |
fromCallable -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
flatMapSingle -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled before create()`() { | |
val testObserver = Observable.just(1, 2, 3, 4, 5, 6) | |
.flatMap { | |
Observable.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.flatMap { value -> | |
counter++ | |
println("flatMap -> value: $value") | |
Observable.create<Int> { sbr -> | |
println("create -> value: $value") | |
sbr.onNext(value) | |
sbr.onComplete() | |
} | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it * it | |
} | |
.doOnComplete { | |
counter++ | |
println("doOnComplete") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(1) | |
/* Output: | |
fromCallable -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
flatMap -> value: 1 | |
*/ | |
} | |
private fun triggerActions() = rule.testScheduler.triggerActions() | |
private fun disposeObserverWithDelay(delay: Long, testObserver: TestObserver<*>) { | |
Executors.newSingleThreadExecutor().submit { | |
Thread.sleep(delay) | |
println("disposing") | |
testObserver.dispose() | |
} | |
} | |
private fun assertCounterValue(expectedCounter: Int) { | |
assertEquals("Invalid counter value", expectedCounter, counter) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import io.reactivex.Completable | |
import io.reactivex.Single | |
import io.reactivex.observers.TestObserver | |
import io.reactivex.schedulers.Schedulers | |
import org.junit.Assert.assertEquals | |
import org.junit.Rule | |
import org.junit.Test | |
import java.util.concurrent.Executors | |
class SingleOperatorsTest { | |
@get:Rule | |
val rule = TestSchedulerRule() | |
private var counter: Int = 0 | |
@Test | |
fun `Emission cancelled right after fromCallable()`() { | |
val testObserver = Single.fromCallable { | |
println("fromCallable") | |
Thread.sleep(100L) | |
1 | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(50L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(0) | |
/* Output: | |
fromCallable | |
disposing | |
doOnDispose: observable has been disposed | |
*/ | |
} | |
@Test | |
fun `Emission cancelled right after flatMap()`() { | |
val testObserver = Single.just(1) | |
.flatMap { | |
Single.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
it | |
} | |
.flatMap { | |
counter++ | |
Thread.sleep(100) | |
println("flatMap -> value: $it") | |
Single.just(it) | |
} | |
.doOnSuccess { | |
counter++ | |
println("doOnSuccess -> value: $it") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable -> value: 1 | |
map -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
flatMap -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission disposing in map() should still get the value to subscriber if next operator is doOnSuccess()`() { | |
val testObserver = Single.just(1) | |
.flatMap { | |
Single.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
Thread.sleep(100) | |
it | |
} | |
.doOnSuccess { | |
counter++ | |
println("doOnSuccess -> value: $it") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertValue(1) | |
.assertComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable -> value: 1 | |
map -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
doOnSuccess -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled after disposing in map() after flatMap()`() { | |
val testObserver = Single.just(1) | |
.flatMap { | |
Single.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
Thread.sleep(100) | |
it | |
} | |
.flatMap { | |
counter++ | |
println("flatMap -> value: $it") | |
Single.just(it) | |
} | |
.doOnSuccess { | |
counter++ | |
println("doOnSuccess -> value: $it") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable -> value: 1 | |
map -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
flatMap -> value: 1 | |
*/ | |
} | |
@Test | |
fun `Emission cancelled after disposing in map() after flatMapCompletable()`() { | |
val testObserver = Single.just(1) | |
.flatMap { | |
Single.fromCallable { | |
println("fromCallable -> value: $it") | |
Thread.sleep(100L) | |
it | |
} | |
} | |
.map { | |
counter++ | |
println("map -> value: $it") | |
Thread.sleep(100) | |
it | |
} | |
.flatMapCompletable { | |
counter++ | |
println("flatMapCompletable -> value: $it") | |
Completable.complete() | |
} | |
.toSingle { | |
counter++ | |
1 | |
} | |
.doOnSuccess { | |
counter++ | |
println("doOnSuccess -> value: $it") | |
} | |
.doOnDispose { println("doOnDispose: observable has been disposed") } | |
.subscribeOn(Schedulers.io()) | |
.test() | |
testObserver.assertNoValues() | |
disposeObserverWithDelay(150L, testObserver) | |
triggerActions() | |
testObserver | |
.assertNoValues() | |
.assertNotComplete() | |
.assertOf { it.isCancelled } | |
assertCounterValue(2) | |
/* Output: | |
fromCallable -> value: 1 | |
map -> value: 1 | |
disposing | |
doOnDispose: observable has been disposed | |
flatMapCompletable -> value: 1 | |
*/ | |
} | |
private fun triggerActions() = rule.testScheduler.triggerActions() | |
private fun disposeObserverWithDelay(delay: Long, testObserver: TestObserver<*>) { | |
Executors.newSingleThreadExecutor().submit { | |
Thread.sleep(delay) | |
println("disposing") | |
testObserver.dispose() | |
} | |
} | |
private fun assertCounterValue(expectedCounter: Int) { | |
assertEquals("Invalid counter value", expectedCounter, counter) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package test | |
import io.reactivex.Scheduler | |
import io.reactivex.android.plugins.RxAndroidPlugins | |
import io.reactivex.functions.Function | |
import io.reactivex.plugins.RxJavaPlugins | |
import io.reactivex.schedulers.TestScheduler | |
import org.junit.rules.TestRule | |
import org.junit.runner.Description | |
import org.junit.runners.model.Statement | |
class TestSchedulerRule : TestRule { | |
var testScheduler = TestScheduler() | |
private set | |
private fun provideSchedulerFunction(scheduler: Scheduler): Function<Scheduler, Scheduler> { | |
return Function { scheduler } | |
} | |
override fun apply(base: Statement, d: Description): Statement { | |
return object : Statement() { | |
@Throws(Throwable::class) | |
override fun evaluate() { | |
RxJavaPlugins.setIoSchedulerHandler(provideSchedulerFunction(testScheduler)) | |
RxJavaPlugins.setComputationSchedulerHandler(provideSchedulerFunction(testScheduler)) | |
RxJavaPlugins.setNewThreadSchedulerHandler(provideSchedulerFunction(testScheduler)) | |
RxAndroidPlugins.setMainThreadSchedulerHandler(provideSchedulerFunction(testScheduler)) | |
try { | |
base.evaluate() | |
} finally { | |
RxJavaPlugins.reset() | |
RxAndroidPlugins.reset() | |
testScheduler = TestScheduler() | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment