Skip to content

Instantly share code, notes, and snippets.

@rossdanderson
Created April 24, 2016 13:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossdanderson/10bda3a9a7444acf52d483ee0a024468 to your computer and use it in GitHub Desktop.
Save rossdanderson/10bda3a9a7444acf52d483ee0a024468 to your computer and use it in GitHub Desktop.
Shares a number of underlying observables by provided keys Removes the underlying observable from the cache if there are no binding subscriptions to it, or if it terminates
package streams
import rx.Observable
import rx.observers.Subscribers.from
import rx.subscriptions.Subscriptions
import java.util.*
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* Shares a number of underlying observables
* Unsubscribes from and removes the source observable from the cache if there are no binding subscriptions to it,
* or if the source observable terminates
*/
class SharedObservableCache<K, T>(private val factory: (K) -> Observable<T>) {
private val sharedObservables = HashMap<K, Observable<T>>()
private val lock = ReentrantLock()
operator fun get(key: K): Observable<T> {
return Observable.create<T>(
{ subscriber ->
val bindingSubscriber = from(subscriber)
lock.withLock {
getSharedObservable(key).subscribe(bindingSubscriber)
}
subscriber.add(Subscriptions.create(
{
lock.withLock {
bindingSubscriber.unsubscribe()
}
}))
})
}
private fun getSharedObservable(key: K): Observable<T> {
return sharedObservables.getOrPut(key,
{
factory.invoke(key)
// Occurs before calls to onComplete/onError to subscribers
// Removed at this point so as not to allow a subscriber to subscribe to a terminating observable
.doOnTerminate { lock.withLock { sharedObservables.remove(key) } }
// Locked by unsubscription of binding subscriber
.doOnUnsubscribe { sharedObservables.remove(key) }
.share()
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment