Skip to content

Instantly share code, notes, and snippets.

@dimsuz
Last active November 6, 2019 18:08
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dimsuz/de41375d79b25fd0a19ce9e41c988e16 to your computer and use it in GitHub Desktop.
Save dimsuz/de41375d79b25fd0a19ce9e41c988e16 to your computer and use it in GitHub Desktop.
RxJava2 operator which adjusts items of an LCE (loading-content-error) stream so that L (loading) and C/E (content/error) items are not emitted close to each other. This is a reactive state alternative to Android's ContentLoadingProgressBar: https://developer.android.com/reference/android/support/v4/widget/ContentLoadingProgressBar.html
package com.dimsuz.lcefilterdelay
import io.reactivex.Observable
import io.reactivex.ObservableSource
import io.reactivex.Observer
import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.exceptions.Exceptions
import io.reactivex.internal.disposables.DisposableHelper
import io.reactivex.internal.fuseable.SimpleQueue
import io.reactivex.internal.queue.SpscLinkedArrayQueue
import io.reactivex.plugins.RxJavaPlugins
import io.reactivex.schedulers.Schedulers
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
/**
* Adjusts items of an LCE (loading-content-error) stream so that L (loading) and C/E (content/error) items
* are not emitted close to each other.
*
* Whenever a source observable emits an `L-item` (determined by [loadingItemPredicate]) there are two possible
* scenarios:
*
* * if this `L-item` will be followed by a `C/E-item` such that time between `L` and `C/E` items is less
* than [acceptableContentItemDelay] then `L-item` will be swallowed and only `C/E-item` will be emitted.
* * if on the other hand a source observable emits an `L-item` and then won't emit any `C/E-item`
* in [acceptableContentItemDelay] interval, then `L-item` is not swallowed and the next `C/E-item` will be delayed
* by at least [minDelayFromLoadingItem] interval (if it comes sooner).
* This is done to avoid flickering between `L => C/E` states in case `C/E-item` will be emitted close to
* the delayed L-event. I.e. a loading state will be shown to the user for [minDelayFromLoadingItem]
* period of time (at a minimum)
*
* The effect of applying this operator is that the stream will be shifted by at least [acceptableContentItemDelay],
* because each `L-item` will always be held back until [acceptableContentItemDelay] has passed, so that it can be
* determined which of above two routes should be taken
*
* Also note that when `L-item` is emitted:
* * All successive `L-items` emitted during [acceptableContentItemDelay] interval will be ignored.
* After next `C/E-item` will be emitted, later `L-item`s will be treated according to above rules
* * All successive `C/E-items` emitted during [acceptableContentItemDelay] interval will be queued and emitted
* immediately after [minDelayFromLoadingItem] delay
*/
fun <T> Observable<T>.lceFilterDelay(
loadingItemPredicate: (T) -> Boolean,
acceptableContentItemDelay: Long,
minDelayFromLoadingItem: Long,
scheduler: Scheduler = Schedulers.computation()
): Observable<T> {
return RxJavaPlugins.onAssembly(
ObservableLceFilterDelay(
this,
loadingItemPredicate,
acceptableContentItemDelay,
minDelayFromLoadingItem,
scheduler
)
)
}
private class ObservableLceFilterDelay<T>(
private val source: ObservableSource<T>,
private val loadingPredicate: (T) -> Boolean,
private val acceptableContentDelay: Long,
private val minDelayFromLoadingState: Long,
private val scheduler: Scheduler
) : Observable<T>() {
override fun subscribeActual(s: Observer<in T>) {
return source.subscribe(
FilterDelayObserver(
s,
loadingPredicate,
acceptableContentDelay,
minDelayFromLoadingState,
scheduler.createWorker()
)
)
}
class FilterDelayObserver<T>(
private val actual: Observer<in T>,
private val loadingPredicate: (T) -> Boolean,
private val acceptableContentDelay: Long,
private val minDelayFromLoadingState: Long,
private val w: Scheduler.Worker
) : Observer<T>, Disposable {
private var d: Disposable? = null
private val queue = SpscLinkedArrayQueue<T>(12)
private val error = AtomicReference<Throwable?>(null)
private val completed = AtomicBoolean(false)
private val disposed = AtomicBoolean(false)
// will be non-null only right after emitting load event and before emitting
// the next content event
private val loadEmitTimestamp = AtomicLong(0)
// NOTE: any stream item delays should follow this pattern:
// put `item` in queue + schedule `QueueDrain()` runnable.
// This ensures that onComplete are called after queue processing
// and that stream won't complete until queue is drained
override fun onSubscribe(d: Disposable) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d
actual.onSubscribe(this)
}
}
override fun onNext(t: T) {
if (loadingPredicate(t)) {
if (queue.isEmpty) {
// enqueue L-item to later see if C-item has arrived in acceptable time interval
queue.offer(t)
w.schedule(QueueDrain(), acceptableContentDelay, TimeUnit.MILLISECONDS)
} else {
// all L-events during draining are ignored
// (I have no skills to handle all that - and it's not really needed to
// have multiple load events in the UI anyway)
}
} else { // received C-item
if (queue.isEmpty) {
if (loadEmitTimestamp.get() == 0L) {
// no queuing is going on, no L-events were spotted on the horizon,
// continue as usual
actual.onNext(t)
} else {
// L-event has been emitted just before this C-item,
// schedule it for processing later with minimal delay required:
// must come not sooner than minDelayFromLoadingState after L-item
val ts = loadEmitTimestamp.getAndSet(0)
queue.offer(t)
val delay = minDelayFromLoadingState - (System.currentTimeMillis() - ts)
w.schedule(QueueDrain(), maxOf(delay, 0), TimeUnit.MILLISECONDS)
}
} else {
// queue draining is scheduled or is in progress, enqueue this item too
queue.offer(t)
}
}
}
override fun onError(e: Throwable) {
// source observable's onComplete() or onError() might
// be called while we are busy processing enqueued and delayed items,
// if so - delay completion until queue is drained
if (queue.isEmpty) {
actualError(e)
} else {
error.set(e)
}
}
override fun onComplete() {
// source observable's onComplete() or onError() might
// be called while we are busy processing enqueued and delayed items,
// if so - delay completion until queue is drained
if (queue.isEmpty) {
actualComplete()
} else {
completed.set(true)
}
}
private fun actualComplete() {
try {
actual.onComplete()
} finally {
dispose()
}
}
private fun actualError(e: Throwable) {
try {
actual.onError(e)
} finally {
dispose()
}
}
override fun dispose() {
disposed.set(true)
d?.dispose()
w.dispose()
}
override fun isDisposed(): Boolean {
return w.isDisposed
}
private inner class QueueDrain : Runnable {
override fun run() {
while (true) {
if (disposed.get()) {
queue.clear()
return
}
val t: T? = queue.safePoll()
val empty = t == null
if (empty) {
// if source observable's onComplete/onError were received during
// queue processing, now is a good time to act on them - after it finished
if (completed.get()) {
actualComplete()
} else {
error.get()?.also { actualError(it) }
}
return
} else {
if (loadingPredicate(t!!)) {
// check if L-item and next C-item are close enough: if yes, skip L
val next: T? = queue.peek()
check(next == null || !loadingPredicate(next)) {
"loading events must not be enqueued while waiting for content event!"
}
val emitLoad = next == null
if (emitLoad) {
// no C-item, so this L-item goes down the stream
// timestamp will be used to determine by how much to delay next C-item
loadEmitTimestamp.set(System.currentTimeMillis())
actual.onNext(t)
} // else there's a C-item and it's close, will emit it on next loop iteration
} else {
actual.onNext(t)
}
}
}
}
}
private fun SimpleQueue<T>.safePoll(): T? {
return try {
this.poll()
} catch (e: Throwable) {
Exceptions.throwIfFatal(e)
dispose()
queue.clear()
actual.onError(e)
null
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment