Skip to content

Instantly share code, notes, and snippets.

@segfault87
Last active June 28, 2021 08:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save segfault87/b62816b9e6452df35617f76c269b5630 to your computer and use it in GitHub Desktop.
Save segfault87/b62816b9e6452df35617f76c269b5630 to your computer and use it in GitHub Desktop.
RxContext.kt
package com.example
import androidx.lifecycle.*
import io.reactivex.Completable
import io.reactivex.Observable
import io.reactivex.Single
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.rxkotlin.addTo
interface RxContextProvider {
val disposables: CompositeDisposable
fun disposeAll() {
disposables.dispose()
}
}
open class RxContextViewModel : ViewModel(), RxContextProvider {
override val disposables = CompositeDisposable()
override fun onCleared() {
super.onCleared()
disposeAll()
}
}
class RxContextScope(val disposables: CompositeDisposable) {
inline fun <reified T> Single<T>.subscribe_(): Disposable {
return subscribe().addTo(disposables)
}
inline fun <reified T> Single<T>.subscribe_(
crossinline onSuccess: (arg: T) -> Unit,
): Disposable {
return subscribe { t -> onSuccess(t) }.addTo(disposables)
}
inline fun <reified T> Single<T>.subscribe_(
crossinline onSuccess: (arg: T) -> Unit,
crossinline onError: (throwable: Throwable) -> Unit
): Disposable {
return subscribe({ onSuccess(it) }, { onError(it) }).addTo(disposables)
}
inline fun <reified T> Observable<T>.subscribe_(): Disposable {
return subscribe().addTo(disposables)
}
inline fun <reified T> Observable<T>.subscribe_(
crossinline onSuccess: (arg: T) -> Unit
): Disposable {
return subscribe { onSuccess(it) }.addTo(disposables)
}
inline fun <reified T> Observable<T>.subscribe_(
crossinline onSuccess: (arg: T) -> Unit,
crossinline onError: (throwable: Throwable) -> Unit
): Disposable {
return subscribe({ onSuccess(it) }, { onError(it) }).addTo(disposables)
}
fun Completable.subscribe_(): Disposable {
return subscribe().addTo(disposables)
}
fun Completable.subscribe_(
onComplete: () -> Unit,
onError: (throwable: Throwable) -> Unit
): Disposable {
return subscribe(onComplete, onError).addTo(disposables)
}
}
fun RxContextProvider.rx(f: RxContextScope.() -> Unit) {
f(RxContextScope(disposables))
}
fun <T> T.installRxContext() where T: RxContextProvider, T: LifecycleOwner {
lifecycle.addObserver(object: LifecycleObserver {
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
disposeAll()
}
})
}
///////////////////////////////////////////////////////////////////////////////
// How to use:
// 1. Implement your activity/fragment w/ RxContextProvider interface.
// 2. On activity/fragment, call installRxContext() at initialization.
// 3. Call Rx functions inside of rx {} scope.
// 4. Call subscribe_() instead of subscribe().
// 5. Extend RxContextViewModel for ViewModels.
class TestActivity : AppCompatActivity(), RxContextProvider {
override val disposables = CompositeDisposable()
private val myApi = MyApi()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
installRxContext()
}
fun doSomething() {
rx {
myApi.callSomething()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe_({ // Make sure to call subscribe_() instead of subscribe()
// On success
}, {
// On error
})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment