Skip to content

Instantly share code, notes, and snippets.

@Syex
Last active May 14, 2018 18:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Syex/a6f62f87a3a218b09edf6e8a2a9b2cfc to your computer and use it in GitHub Desktop.
Save Syex/a6f62f87a3a218b09edf6e8a2a9b2cfc to your computer and use it in GitHub Desktop.
class Emitter(private val stateObservable: Observable<State>) {
private val subject = BehaviorSubject.create<Data>()
private var subscriberCount = AtomicInteger()
private val onSubscribeConsumer: (() -> Unit) = {
if (subscriberCount.getAndIncrement() == 0) subscribeToObservable()
}
private val onDisposeConsumer: (() -> Unit) = {
if (subscriberCount.decrementAndGet() == 0) unsubscribeFromObservable()
}
private var defaultStateDisposable: Disposable? = null
private var specialStateDisposable: Disposable? = null
private var state: State? = null
init {
stateObservable.subscribe { stateChanged(it) }
}
private fun stateChanged(state: State) {
this.state = state
if (subscriberCount.get() > 0) {
unsubscribeFromObservable()
subscribeToObservable()
}
}
fun stateObservable(): Observable<Data> {
return subject
.doOnSubscribe { onSubscribeConsumer() }
.doOnDispose { onDisposeConsumer() }
}
private fun subscribeToObservable() {
when (state) {
NORMAL -> defaultStateDisposable = defaultStateObservable().subscribe { subject.onNext { it } }
SPECIAL -> specialStateDisposable = pecialStateObservable().subscribe { subject.onNext { it } }
}
}
private fun unsubscribeFromObservable() {
specialStateDisposable?.dispose()
defaultStateDisposable?.dispose()
specialStateDisposable = null
defaultStateDisposable = null
}
private fun defaultStateObservable(): Observable<Data>
private fun specialStateObservable(): Observable<Data>
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment