Skip to content

Instantly share code, notes, and snippets.

@nanox77
Last active April 29, 2021 07:53
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nanox77/76ceac56da328cf31a02534a2158f12c to your computer and use it in GitHub Desktop.
Save nanox77/76ceac56da328cf31a02534a2158f12c to your computer and use it in GitHub Desktop.
Cache implementation for Single element using RxJava.
import io.reactivex.Single
import io.reactivex.SingleObserver
import io.reactivex.SingleSource
import io.reactivex.SingleTransformer
import io.reactivex.functions.Consumer
import org.joda.time.DateTime
import java.util.concurrent.TimeUnit
class SingleRxCache<T>(private val timeout: Long, private val unit: TimeUnit) : SingleTransformer<T, T> {
override fun apply(upstream: Single<T>): SingleSource<T> {
val lastElementSeen = LastElementSeen<T>(timeout, unit)
return LastElementSeenSingle(upstream.doOnSuccess(lastElementSeen), lastElementSeen)
}
}
class LastElementSeenSingle<T>(private var upstream: Single<T>, private val lastElementSeen: LastElementSeen<T>) : SingleSource<T> {
override fun subscribe(observer: SingleObserver<in T>) {
if (lastElementSeen.isValid()) {
lastElementSeen.value?.let(observer::onSuccess) ?: upstream.subscribe(observer)
} else {
upstream.subscribe(observer)
}
}
}
class LastElementSeen<T>(private val timeout: Long, private val unit: TimeUnit) : Consumer<T> {
private var lastEmissionTimestamp: Long = 0
var value: T? = null
override fun accept(latest: T) {
lastEmissionTimestamp = System.currentTimeMillis()
value = latest
}
fun isValid(): Boolean {
// If you don't use JodaTime: System.currentTimeMillis() - lastEmissionTimestamp <= unit.toMillis(timeout)
return value?.let { DateTime.now().minus(lastEmissionTimestamp).isBefore(unit.toMillis(timeout)) } ?: false
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment