Skip to content

Instantly share code, notes, and snippets.

@Takhion
Last active September 12, 2017 08:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Takhion/5c99828084af49baeb8b8a321959f2fa to your computer and use it in GitHub Desktop.
Save Takhion/5c99828084af49baeb8b8a321959f2fa to your computer and use it in GitHub Desktop.
import rx.Observable
import rx.subjects.BehaviorSubject
import SingletonObservable.ItemWrapper
/**
* Represents the current subscription status (either 'unsubscribed' or 'subscribed') of a [SingletonObservable].
* The index starts at [SingletonObservable.firstSubscriptionIndex] as 'unsubscribed', and every subsequent value alternates between the two states.
* Use [SingletonObservable.isSubscribed] to calculate the status for a specific [SubscriptionIndex].
*
* Note that a subscription index is constant for the lifetime of a stream, which is completed by the first time either one of
* [Observable.doOnTerminate] or [Observable.doOnUnsubscribe] is called.
*/
typealias SubscriptionIndex = Long
/**
* Exposes [observable] and [observableWithSubscription], which allow only one subscriber at a time to consume an observable created each time with
* [createObservable], while other subscribers will queue and wait their turn.
*
* In order to allow observers to coordinate over un/subscriptions, it also exposes:
* + [isCurrentlySubscribed]
* + [subscriptionIndex]
* + [subscriptionIndexObservable]
* + [observeUnsubscription]
* + [observeResubscription]
*/
class SingletonObservable<T>(
private val createObservable: () -> Observable<T>
) {
companion object {
/**
* @see [SubscriptionIndex]
*/
const val firstSubscriptionIndex: SubscriptionIndex = Long.MIN_VALUE
private const val unsubscribedRemainder = firstSubscriptionIndex % 2
/**
* @see [SubscriptionIndex]
*/
fun isSubscribed(subscriptionIndex: SubscriptionIndex) =
subscriptionIndex % 2 != unsubscribedRemainder
}
/**
* Used to synchronize access to the subscription.
*/
private val subscriptionLock = Any()
/**
* @see [SubscriptionIndex]
*/
@Volatile
var subscriptionIndex: SubscriptionIndex = firstSubscriptionIndex
private set
/**
* Emits the latest [subscriptionIndex] and every successive value.
*/
private val subscriptionIndexSubject: BehaviorSubject<SubscriptionIndex> =
BehaviorSubject.create(subscriptionIndex)
/**
* Emits the latest [subscriptionIndex] and every successive value.
* Honours backpressure by using [Observable.onBackpressureLatest].
*/
val subscriptionIndexObservable: Observable<SubscriptionIndex> =
subscriptionIndexSubject.onBackpressureLatest()
/**
* Returns whether any observer is currently consuming this [SingletonObservable].
*/
val isCurrentlySubscribed: Boolean
get() = isSubscribed(subscriptionIndex)
/**
* @see [SingletonObservable]
*/
val observable: Observable<T> =
singletonObservable(this::observable) { observable, _ -> observable }
/**
* @see [SingletonObservable]
*/
val observableWithSubscription: Observable<ItemWrapper<T, T>> =
singletonObservable(this::observableWithSubscription) { observable, subscriptionIndex ->
observable.map { item -> ItemWrapper(item, subscriptionIndex, this) }
}
/**
* Wraps [createObservable] using [subscriptionLock] to guard access to the un/subscriptions (see [SingletonObservable]).
*
* @param [selfReference] reference to the property that will hold the generated observable
* @param [transformObservable] transformation to be applied right after creating each new observable through [createObservable]
*/
private inline fun <R> singletonObservable(
crossinline selfReference: () -> Observable<R>,
crossinline transformObservable: (Observable<T>, SubscriptionIndex) -> Observable<R>
): Observable<R> =
Observable.defer<R> {
synchronized(subscriptionLock) {
val index = subscriptionIndex
if (isSubscribed(index)) {
selfReference().delaySubscription(observeUnsubscription(index))
}
else {
val newIndex = toggleSubscription()
transformObservable(createObservable(), newIndex)
.doOnTerminate { unsubscribe(newIndex) }
.doOnUnsubscribe { unsubscribe(newIndex) }
}
}
}
/**
* Unsubscribes from [subscriptionIndex] if it's not already unsubscribed.
*/
private fun unsubscribe(subscriptionIndex: SubscriptionIndex) {
synchronized(subscriptionLock) {
if (this.subscriptionIndex == subscriptionIndex) {
toggleSubscription()
}
}
}
/**
* Toggles between the 'unsubscribed'/'subscribed' subscription state by advancing the [subscriptionIndex].
*/
private fun toggleSubscription(): SubscriptionIndex {
val oldIndex = subscriptionIndex
return (oldIndex + 1).also { newIndex ->
subscriptionIndex = newIndex
subscriptionIndexSubject.onNext(newIndex)
}
}
/**
* Returns an [Observable] that emits nothing and completes when the subscription related to the provided [subscriptionIndex] is unsubscribed
* (or immediately if it already happened).
*
* It's generally easier and safer to use [ItemWrapper.observeUnsubscription].
*/
fun observeUnsubscription(subscriptionIndex: SubscriptionIndex) =
observeUntilSubscription(subscriptionIndex + 1)
/**
* Returns an [Observable] that emits nothing and completes when the subscription related to the provided [subscriptionIndex] is unsubscribed
* and followed by another subscription (or immediately if it already happened).
*
* It's generally easier and safer to use [ItemWrapper.observeResubscription].
*/
fun observeResubscription(subscriptionIndex: SubscriptionIndex) =
observeUntilSubscription(subscriptionIndex + 2)
/**
* Returns an [Observable] that emits nothing and completes when [subscriptionIndex] is reached (or immediately if it already happened).
*/
private fun observeUntilSubscription(subscriptionIndex: SubscriptionIndex): Observable<Nothing> =
subscriptionIndexSubject
.takeWhile { it < subscriptionIndex }
.ignoreElements()
.cast(Nothing::class.java)
/**
* Wraps [item] with the [subscriptionIndex] of the stream from which it's emitted through the [source]'s [observableWithSubscription].
*/
data class ItemWrapper<out T, out R>(
val item: R,
val subscriptionIndex: SubscriptionIndex,
val source: SingletonObservable<out T>
) {
/**
* @see [SingletonObservable.observeUnsubscription]
*/
fun observeUnsubscription() = source.observeUnsubscription(subscriptionIndex)
/**
* @see [SingletonObservable.observeResubscription]
*/
fun observeResubscription() = source.observeResubscription(subscriptionIndex)
/**
* Returns a new [ItemWrapper] with a different [item] but the same [subscriptionIndex] and [source].
*/
fun <R2> withItem(item: R2): ItemWrapper<T, R2> =
ItemWrapper(
item = item,
subscriptionIndex = subscriptionIndex,
source = source)
}
}
/**
* Using [Observable.flatMap], applies [transform] to every [ItemWrapper.item] in the sequence, then wraps back every element in the
* resulting flattened sequence in a new [ItemWrapper] that has the same [ItemWrapper.subscriptionIndex] and [ItemWrapper.source].
*/
inline fun <T, R, R2> Observable<ItemWrapper<T, R>>.flatMapItem(
crossinline transform: (item: R) -> Observable<R2>
): Observable<ItemWrapper<T, R2>> =
flatMap { wrapper -> transform(wrapper.item).map { item -> wrapper.withItem(item) } }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment