Skip to content

Instantly share code, notes, and snippets.

@zawadz88
Last active October 3, 2023 07:39
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zawadz88/ec55d275f2dd878f595ea1bfe27b59af to your computer and use it in GitHub Desktop.
Save zawadz88/ec55d275f2dd878f595ea1bfe27b59af to your computer and use it in GitHub Desktop.
Tests for disposing Disposables
package test
import io.reactivex.Completable
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.Schedulers
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.Executors
class CompletableOperatorsTest {
@get:Rule
val rule = TestSchedulerRule()
private var counter: Int = 0
@Test
fun `Emission cancelled right after fromCallable()`() {
val testObserver = Completable.fromCallable {
println("fromCallable")
Thread.sleep(100L)
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(0)
/* Output:
fromCallable
disposing
doOnDispose: observable has been disposed
*/
}
@Test
fun `Emission cancelled right after andThen()`() {
val testObserver = Completable.fromCallable {
println("fromCallable")
Thread.sleep(100L)
}
.doOnComplete {
counter++
println("doOnComplete1")
}
.andThen {
counter++
println("andThen")
Thread.sleep(100L)
Completable.complete()
}
.doOnComplete {
counter++
println("doOnComplete2")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable
doOnComplete1
andThen
disposing
doOnDispose: observable has been disposed
*/
}
@Test
fun `Emission disposing in doOnComplete() should still get the value to subscriber if next operator is doOnComplete()`() {
val testObserver = Completable.complete()
.andThen(
Completable.fromCallable {
println("fromCallable")
Thread.sleep(100L)
}
)
.doOnComplete {
counter++
println("doOnComplete1")
Thread.sleep(100L)
}
.doOnComplete {
counter++
println("doOnComplete2")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable
doOnComplete1
disposing
doOnDispose: observable has been disposed
doOnComplete2
*/
}
@Test
fun `Emission cancelled after disposing in doOnComplete() after andThen()`() {
val testObserver = Completable.fromCallable {
println("fromCallable")
Thread.sleep(100L)
}
.doOnComplete {
counter++
println("doOnComplete")
Thread.sleep(100L)
}
.andThen {
counter++
println("andThen")
Completable.complete()
}
.doOnComplete {
counter++
println("doOnComplete2")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable
doOnComplete
disposing
doOnDispose: observable has been disposed
andThen
E.g. up until RxJava 2.2.5 "andThen" would not be printed, since 2.2.6 it would (probably due to: https://github.com/ReactiveX/RxJava/pull/6362)
*/
}
@Test
fun `Emission disposing in doOnComplete() should still get the value to subscriber if next operator is toSingle()`() {
val testObserver = Completable.fromCallable {
println("fromCallable")
Thread.sleep(100L)
}
.doOnComplete {
counter++
println("doOnComplete")
Thread.sleep(100L)
}
.toSingle {
counter++
println("toSingle")
1
}
.doOnSuccess {
counter++
println("doOnSuccess")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertValue(1)
.assertComplete()
.assertOf { it.isCancelled }
assertCounterValue(3)
/* Output:
fromCallable
doOnComplete
disposing
doOnDispose: observable has been disposed
toSingle
doOnSuccess
*/
}
private fun triggerActions() = rule.testScheduler.triggerActions()
private fun disposeObserverWithDelay(delay: Long, testObserver: TestObserver<*>) {
Executors.newSingleThreadExecutor().submit {
Thread.sleep(delay)
println("disposing")
testObserver.dispose()
}
}
private fun assertCounterValue(expectedCounter: Int) {
assertEquals("Invalid counter value", expectedCounter, counter)
}
}
package test
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.Schedulers
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.Executors
class ObservableOperatorsTest {
@get:Rule
val rule = TestSchedulerRule()
private var counter: Int = 0
@Test
fun `Uninterrupted emission`() {
val testObserver = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap {
Observable.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.map {
counter++
println("map -> value: $it")
it * it
}
.doOnNext {
counter++
println("doOnNext -> value: $it")
}
.flatMap {
counter++
println("flatMap -> value: $it")
Observable.just(it)
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertValues(1)
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(3)
/* Output:
fromCallable -> value: 1
disposing
doOnDispose: observable has been disposed
map -> value: 1
doOnNext -> value: 1
flatMap -> value: 1
*/
}
@Test
fun `Emission cancelled after switchMap()`() {
val testObserver = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap {
Observable.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.switchMap {
counter++
println("switchMap -> value: $it")
Observable.just(it)
}
.map {
counter++
println("map -> value: $it")
it * it
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(1)
/* Output:
fromCallable -> value: 1
disposing
doOnDispose: observable has been disposed
switchMap -> value: 1
*/
}
@Test
fun `Emission cancelled before concatMap()`() {
val testObserver = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap {
Observable.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.concatMap {
counter++
println("concatMap -> value: $it")
Observable.just(it)
}
.map {
counter++
println("map -> value: $it")
it * it
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(0)
/* Output:
fromCallable -> value: 1
disposing
doOnDispose: observable has been disposed
*/
}
@Test
fun `Emission cancelled after flatMap with Single#just() + toObservable()`() {
val testObserver = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap {
Observable.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.flatMap {
counter++
println("flatMap with toObservable() -> value: $it")
Single.just(it).toObservable()
}
.map {
counter++
println("map -> value: $it")
it * it
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(1)
/* Output:
fromCallable -> value: 1
disposing
doOnDispose: observable has been disposed
flatMap with toObservable() -> value: 1
*/
}
@Test
fun `Emission cancelled after flatMapSingle()`() {
val testObserver = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap {
Observable.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.flatMapSingle {
counter++
println("flatMapSingle -> value: $it")
Single.just(it)
}
.map {
counter++
println("map -> value: $it")
it * it
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(1)
/* Output:
fromCallable -> value: 1
disposing
doOnDispose: observable has been disposed
flatMapSingle -> value: 1
*/
}
@Test
fun `Emission cancelled before create()`() {
val testObserver = Observable.just(1, 2, 3, 4, 5, 6)
.flatMap {
Observable.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.flatMap { value ->
counter++
println("flatMap -> value: $value")
Observable.create<Int> { sbr ->
println("create -> value: $value")
sbr.onNext(value)
sbr.onComplete()
}
}
.map {
counter++
println("map -> value: $it")
it * it
}
.doOnComplete {
counter++
println("doOnComplete")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(1)
/* Output:
fromCallable -> value: 1
disposing
doOnDispose: observable has been disposed
flatMap -> value: 1
*/
}
private fun triggerActions() = rule.testScheduler.triggerActions()
private fun disposeObserverWithDelay(delay: Long, testObserver: TestObserver<*>) {
Executors.newSingleThreadExecutor().submit {
Thread.sleep(delay)
println("disposing")
testObserver.dispose()
}
}
private fun assertCounterValue(expectedCounter: Int) {
assertEquals("Invalid counter value", expectedCounter, counter)
}
}
package test
import io.reactivex.Completable
import io.reactivex.Single
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.Schedulers
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.Executors
class SingleOperatorsTest {
@get:Rule
val rule = TestSchedulerRule()
private var counter: Int = 0
@Test
fun `Emission cancelled right after fromCallable()`() {
val testObserver = Single.fromCallable {
println("fromCallable")
Thread.sleep(100L)
1
}
.map {
counter++
println("map -> value: $it")
it
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(50L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(0)
/* Output:
fromCallable
disposing
doOnDispose: observable has been disposed
*/
}
@Test
fun `Emission cancelled right after flatMap()`() {
val testObserver = Single.just(1)
.flatMap {
Single.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.map {
counter++
println("map -> value: $it")
it
}
.flatMap {
counter++
Thread.sleep(100)
println("flatMap -> value: $it")
Single.just(it)
}
.doOnSuccess {
counter++
println("doOnSuccess -> value: $it")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable -> value: 1
map -> value: 1
disposing
doOnDispose: observable has been disposed
flatMap -> value: 1
*/
}
@Test
fun `Emission disposing in map() should still get the value to subscriber if next operator is doOnSuccess()`() {
val testObserver = Single.just(1)
.flatMap {
Single.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.map {
counter++
println("map -> value: $it")
Thread.sleep(100)
it
}
.doOnSuccess {
counter++
println("doOnSuccess -> value: $it")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertValue(1)
.assertComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable -> value: 1
map -> value: 1
disposing
doOnDispose: observable has been disposed
doOnSuccess -> value: 1
*/
}
@Test
fun `Emission cancelled after disposing in map() after flatMap()`() {
val testObserver = Single.just(1)
.flatMap {
Single.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.map {
counter++
println("map -> value: $it")
Thread.sleep(100)
it
}
.flatMap {
counter++
println("flatMap -> value: $it")
Single.just(it)
}
.doOnSuccess {
counter++
println("doOnSuccess -> value: $it")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable -> value: 1
map -> value: 1
disposing
doOnDispose: observable has been disposed
flatMap -> value: 1
*/
}
@Test
fun `Emission cancelled after disposing in map() after flatMapCompletable()`() {
val testObserver = Single.just(1)
.flatMap {
Single.fromCallable {
println("fromCallable -> value: $it")
Thread.sleep(100L)
it
}
}
.map {
counter++
println("map -> value: $it")
Thread.sleep(100)
it
}
.flatMapCompletable {
counter++
println("flatMapCompletable -> value: $it")
Completable.complete()
}
.toSingle {
counter++
1
}
.doOnSuccess {
counter++
println("doOnSuccess -> value: $it")
}
.doOnDispose { println("doOnDispose: observable has been disposed") }
.subscribeOn(Schedulers.io())
.test()
testObserver.assertNoValues()
disposeObserverWithDelay(150L, testObserver)
triggerActions()
testObserver
.assertNoValues()
.assertNotComplete()
.assertOf { it.isCancelled }
assertCounterValue(2)
/* Output:
fromCallable -> value: 1
map -> value: 1
disposing
doOnDispose: observable has been disposed
flatMapCompletable -> value: 1
*/
}
private fun triggerActions() = rule.testScheduler.triggerActions()
private fun disposeObserverWithDelay(delay: Long, testObserver: TestObserver<*>) {
Executors.newSingleThreadExecutor().submit {
Thread.sleep(delay)
println("disposing")
testObserver.dispose()
}
}
private fun assertCounterValue(expectedCounter: Int) {
assertEquals("Invalid counter value", expectedCounter, counter)
}
}
package test
import io.reactivex.Scheduler
import io.reactivex.android.plugins.RxAndroidPlugins
import io.reactivex.functions.Function
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.TestScheduler
import org.junit.rules.TestRule
import org.junit.runner.Description
import org.junit.runners.model.Statement
class TestSchedulerRule : TestRule {
var testScheduler = TestScheduler()
private set
private fun provideSchedulerFunction(scheduler: Scheduler): Function<Scheduler, Scheduler> {
return Function { scheduler }
}
override fun apply(base: Statement, d: Description): Statement {
return object : Statement() {
@Throws(Throwable::class)
override fun evaluate() {
RxJavaPlugins.setIoSchedulerHandler(provideSchedulerFunction(testScheduler))
RxJavaPlugins.setComputationSchedulerHandler(provideSchedulerFunction(testScheduler))
RxJavaPlugins.setNewThreadSchedulerHandler(provideSchedulerFunction(testScheduler))
RxAndroidPlugins.setMainThreadSchedulerHandler(provideSchedulerFunction(testScheduler))
try {
base.evaluate()
} finally {
RxJavaPlugins.reset()
RxAndroidPlugins.reset()
testScheduler = TestScheduler()
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment