Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
LiveObservable: Android lifecycle aware wrapper for an RxJava Observable
import android.arch.lifecycle.Lifecycle
import android.arch.lifecycle.LifecycleObserver
import android.arch.lifecycle.LifecycleOwner
import android.arch.lifecycle.OnLifecycleEvent
import com.jakewharton.rxrelay.BehaviorRelay
import rx.Observable
import rx.android.schedulers.AndroidSchedulers
import rx.functions.Action1
import rx.subscriptions.CompositeSubscription
/**
* Android lifecycle aware wrapper for an RX Observable (source).
*
* This object allows multiple observers to register for events and all receive the same events using a BehaviorRelay.
*
* When there is at least one observable with an active lifecycle state (Started or Resumed), only then
* the source Observable will be subscribed and start emitting items to active observers through a Relay.
*
* Once all observers are inactive (Not Started nor Resumed) the source observable will be unsubscribed,
* letting the source observable free resources and no more items will be emitted
* until an observer becomes active again.
*/
class LiveObservable<T : Any>(private val source: Observable<T>) : LifecycleObserver {
private val lifecycleOwners = mutableMapOf<LifecycleOwner, Set<Action1<T>>>()
private val internalSubscriptions = CompositeSubscription()
private val externalSubscriptions = mutableMapOf<LifecycleOwner, CompositeSubscription>()
private val valueRelay: BehaviorRelay<T?> = BehaviorRelay.create()
fun observeWhileStarted(lifecycleOwner: LifecycleOwner, observer: Action1<T>) {
synchronized(lifecycleOwners) {
lifecycleOwners[lifecycleOwner] = lifecycleOwners[lifecycleOwner].orEmpty() + observer
lifecycleOwner.lifecycle.addObserver(this)
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_START)
fun subscribeForUpdates(lifecycleOwner: LifecycleOwner) {
synchronized(lifecycleOwners) {
subscribeToSourceObservable()
subscribeExternalObservers(lifecycleOwner)
}
}
/**
* Only the first lifecycle that is active enables Subscription to source Observable
*/
private fun subscribeToSourceObservable() {
if (!internalSubscriptions.hasSubscriptions()) {
val subscription = source
.observeOn(AndroidSchedulers.mainThread())
.subscribe(valueRelay)
internalSubscriptions.add(subscription)
}
}
private fun subscribeExternalObservers(activeLifecycleOwner: LifecycleOwner) {
lifecycleOwners[activeLifecycleOwner]?.let { observers ->
observers.forEach { observer ->
val subscription = valueRelay
.asObservable()
.filterNotNull()
.subscribe(observer)
if (externalSubscriptions[activeLifecycleOwner] == null) externalSubscriptions[activeLifecycleOwner] = CompositeSubscription()
externalSubscriptions[activeLifecycleOwner]?.add(subscription)
}
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
fun unsubscribeForUpdates(lifecycleOwner: LifecycleOwner) {
synchronized(lifecycleOwners) {
unsubscribeFromSourceObservable()
unsubscribeExternalObservers(lifecycleOwner)
}
}
/**
* Stop receiving items and unsubscribe from source Observable only when no one is active
*/
private fun unsubscribeFromSourceObservable() {
if (lifecycleOwners.keys.none { isActiveState(it.lifecycle) }) internalSubscriptions.clear()
}
private fun unsubscribeExternalObservers(inactiveLifecycleOwner: LifecycleOwner) {
lifecycleOwners[inactiveLifecycleOwner]?.let {
externalSubscriptions[inactiveLifecycleOwner]?.clear()
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun unregisterObserver(lifecycleOwner: LifecycleOwner) {
synchronized(lifecycleOwners) {
lifecycleOwner.lifecycle.removeObserver(this)
lifecycleOwners.remove(lifecycleOwner)
}
}
fun value(): T? = valueRelay.value
private fun isActiveState(lifecycle: Lifecycle) = lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED)
private fun <T : Any> Observable<T?>.filterNotNull(): Observable<T> = this.filter { it != null }.map { it }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment