Skip to content

Instantly share code, notes, and snippets.

@clemp6r
Last active March 3, 2016 14:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clemp6r/bd8da18f70c15ea15a61 to your computer and use it in GitHub Desktop.
Save clemp6r/bd8da18f70c15ea15a61 to your computer and use it in GitHub Desktop.
package com.wizbii.wizbiiandroid.services
import rx.Observable
import rx.subjects.PublishSubject
/**
* POC of a reactive data store.
*/
abstract class ReactiveStore<T> {
/**
* Local version of the data.
*/
private var value: T? = null
/**
* Bus for notifying new versions of the data to all subscribers.
*/
private var bus = PublishSubject.create<T>()
/**
* Observable for fetching the remote version of the data.
*/
protected abstract fun getRemoteData(): Observable<T>
/**
* Updates the remote data and emits the updated value.
*/
protected abstract fun updateRemoteData(parameters: Any): Observable<T>
/**
* Infinite stream of value updates.
*/
fun values(): Observable<T> {
return Observable.create {
if (value != null) {
it.onNext(value)
bus.subscribe(it)
} else {
bus.subscribe(it)
getRemoteData().subscribe(
{
onNewValue(it)
},
{
// TODO wrap errors into values without stopping the observable
bus.onError(it)
bus = PublishSubject.create<T>()
}
)
}
}
}
private fun onNewValue(it: T) {
value = it
bus.onNext(it)
}
fun updateValue(parameters: Any): Observable<T> {
return updateRemoteData(parameters).doOnNext { onNewValue(it) }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment