Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Helper compose functions for RxKotlin
import android.annotation.SuppressLint
import com.blissapplications.kotlin.core.common.threads.ReactivexThreadSchedulerProvider
import io.reactivex.Completable
import io.reactivex.Single
import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
/**
* Examples of usage of this extensions functions:
*
* private val compositeDisposable by lazy { CompositeDisposable() }
*
* Single.composeForLoading(schedulerProvider, compositeDisposable){ toggle -> toggleLoading(toggle)}
* Completable.composeForLoading(schedulerProvider, compositeDisposable){ toggle -> toggleLoading(toggle)}
*/
fun Disposable.addToComposite(bag: CompositeDisposable) {
bag.add(this)
}
@Suppress("unused")
@SuppressLint("CheckResult")
fun Completable.composeWithBag(bag: CompositeDisposable) {
compose { upstream -> upstream.doOnSubscribe { it.addToComposite(bag) } }
}
@Suppress("unused")
@SuppressLint("CheckResult")
fun Completable.composeForLoading(
schedulerProvider: ReactivexThreadSchedulerProvider,
bag: CompositeDisposable,
toggleLoading: (Boolean) -> Unit
) {
compose { upstream: Completable ->
upstream
.doOnSubscribe {
it.addToComposite(bag)
toggleLoading(true)
}
.observeOn(schedulerProvider.ui)
.doFinally { toggleLoading(false) }
}
}
@Suppress("unused")
@SuppressLint("CheckResult")
fun <T> Single<T>.composeWithBag(bag: CompositeDisposable) {
compose { upstream -> upstream.doOnSubscribe { it.addToComposite(bag) } }
}
@Suppress("unused")
@SuppressLint("CheckResult")
fun <T> Single<T>.composeForLoading(
schedulerProvider: ReactivexThreadSchedulerProvider,
bag: CompositeDisposable,
toggleLoading: (Boolean) -> Unit
) {
compose { upstream ->
upstream
.doOnSubscribe {
it.addToComposite(bag)
toggleLoading(true)
}
.observeOn(schedulerProvider.ui)
.doFinally { toggleLoading(false) }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment