Skip to content

Instantly share code, notes, and snippets.

@neworld
Created March 9, 2021 09:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save neworld/3f7d5b2d2ecb9aaee156f41684de0ff1 to your computer and use it in GitHub Desktop.
Save neworld/3f7d5b2d2ecb9aaee156f41684de0ff1 to your computer and use it in GitHub Desktop.
RxJava operator written in kotlin which breaks dispose chain
package com.vinted.feature.base.ui.rx
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
/**
*
* Breaks dispose chain. Upstream will be not be disposed.
* This class should be used in cases where you need guarantee upstream will be executed at any price.
* For example network call, those must be delivered
*/
class BreakDisposeChainObservable<T: Any>(private val upstream: Observable<T>) : Observable<T>() {
override fun subscribeActual(observer: Observer<in T>) {
upstream.subscribe(BreakDisposeChainObserver(observer))
}
class BreakDisposeChainObserver<T>(observer: Observer<T>) : Observer<T>, Disposable {
@Volatile
private var observer: Observer<T>? = observer
override fun onSubscribe(disposable: Disposable) {
observer!!.onSubscribe(this)
}
override fun onComplete() {
observer?.onComplete()
}
override fun onNext(value: T) {
observer?.onNext(value)
}
override fun onError(error: Throwable) {
observer?.onError(error)
}
override fun isDisposed() = observer == null
override fun dispose() {
observer = null
}
}
}
fun <T: Any> Observable<T>.breakDisposeChain(): Observable<T> = BreakDisposeChainObservable(this)
package com.vinted.data.rx.api
import com.vinted.feature.base.ui.rx.BreakDisposeChainObservable
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
import io.reactivex.subjects.PublishSubject
import org.junit.Test
import kotlin.test.assertTrue
class BreakDisposeChainObservableTest {
@Test
fun complete_notDisposed_pass() {
val upstream = Observable.empty<Unit>()
val fixture = BreakDisposeChainObservable(upstream)
fixture.test().assertComplete()
}
@Test
fun next_notDisposed_pass() {
val upstream = Observable.just(5)
val fixture = BreakDisposeChainObservable(upstream)
fixture.test().assertValue(5)
}
@Test
fun error_notDisposed_pass() {
val exception = RuntimeException("test")
val upstream = Observable.error<Unit>(exception)
val fixture = BreakDisposeChainObservable(upstream)
fixture.test().assertError(exception)
}
@Test
fun dispose_upstreamShouldBeNot_disposed() {
val upstream = PublishSubject.create<Unit>()
val fixture = BreakDisposeChainObservable(upstream)
fixture.subscribe().dispose()
assertTrue(upstream.hasObservers())
}
@Test
fun complete_disposed_notPass() {
val upstream = PublishSubject.create<Unit>()
val fixture = BreakDisposeChainObservable(upstream)
val test = fixture.test()
test.dispose()
upstream.onComplete()
test.assertNotComplete()
}
@Test
fun next_disposed_notPass() {
val upstream = PublishSubject.create<Unit>()
val fixture = BreakDisposeChainObservable(upstream)
val test = fixture.test()
test.dispose()
upstream.onNext(Unit)
test.assertNoValues()
}
@Test
fun error_disposed_notPass() {
val upstream = PublishSubject.create<Unit>()
val fixture = BreakDisposeChainObservable(upstream)
val test = fixture.test()
test.dispose()
upstream.onError(RuntimeException())
test.assertNoErrors()
}
@Test
fun dispose_isDisposed() {
val fixture = BreakDisposeChainObservable(Observable.never<Unit>())
val disposableTracker = ParentDisposableDelegate()
fixture.subscribe(disposableTracker)
disposableTracker.dispose()
assertTrue(disposableTracker.isDisposed)
}
class ParentDisposableDelegate : Observer<Unit>, Disposable {
private lateinit var parentDisposable: Disposable
override fun onError(p0: Throwable) {
}
override fun onComplete() {
}
override fun onNext(p0: Unit) {
}
override fun onSubscribe(disposable: Disposable) {
parentDisposable = disposable
}
override fun isDisposed() = parentDisposable.isDisposed
override fun dispose() {
parentDisposable.dispose()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment