Skip to content

Instantly share code, notes, and snippets.

@PaulWoitaschek
Last active December 27, 2017 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PaulWoitaschek/3c069e61900e622ddd6e7e78781cd353 to your computer and use it in GitHub Desktop.
Save PaulWoitaschek/3c069e61900e622ddd6e7e78781cd353 to your computer and use it in GitHub Desktop.
Reactive Repository
class Repo<in Key : Any, Value : Any>(
private val dataSource: DataSource<Key, Value>,
private val persister: Persister<Key, Value>
) {
private val cleared = PublishSubject.create<Key>()
private val cache = HashMap<Key, Flowable<Value>>()
fun stream(key: Key): Flowable<Value> =
cache.getOrPut(key) { newStream(key) }
fun clear(key: Key): Completable =
persister.delete(key)
.doOnComplete { cleared.onNext(key) }
private fun newStream(key: Key): Flowable<Value> =
persister.get(key)
.toFlowable()
.flatMap {
when (it) {
is Result.Fresh -> Flowable.just(it.value)
is Result.Stale -> Single.just(it.value).concatWith(getAndStore(key))
is Result.Missing -> getAndStore(key).toFlowable()
}
}
.repeatWhen { it.switchMap { cleared.toFlowable(BackpressureStrategy.LATEST) } }
.share()
private fun getAndStore(key: Key): Single<Value> =
dataSource.get(key)
.flatMap { value ->
persister.set(key, value)
.toSingleDefault(value)
}
}
interface DataSource<in Key : Any, Value : Any> {
fun get(key: Key): Single<Value>
}
interface Persister<in Key : Any, Value : Any> {
fun get(key: Key): Single<Result<Value>>
fun set(key: Key, value: Value): Completable
fun delete(key: Key): Completable
}
@Suppress("unused")
sealed class Result<Value : Any> {
data class Fresh<Value : Any>(val value: Value) : Result<Value>()
data class Stale<Value : Any>(val value: Value) : Result<Value>()
class Missing<Value : Any> : Result<Value>()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment