Skip to content

Instantly share code, notes, and snippets.

@vchernyshov
Created March 23, 2020 12:23
Show Gist options
  • Save vchernyshov/5fcca5bf35a84a7eed3a7a5b88d288ab to your computer and use it in GitHub Desktop.
Save vchernyshov/5fcca5bf35a84a7eed3a7a5b88d288ab to your computer and use it in GitHub Desktop.
internal fun <T> Observable<T>.bufferValuesWhileIdle(
isIdleObservable: Observable<Boolean>,
bufferSize: Int? = null
): Observable<T> =
observable { emitter->
val compositeDisposable = CompositeDisposable()
emitter.setDisposable(compositeDisposable)
var done = false
var isIdle = false
val bufferedValues: Queue<T> = LinkedList()
subscribe(
object : ObservableObserver<T>, CompletableCallbacks {
override fun onSubscribe(disposable: Disposable) {
compositeDisposable += disposable
compositeDisposable += isIdleObservable.subscribe {
if (it) {
isIdle = true
} else {
isIdle = false
bufferedValues.forEach { v ->
onNext(v)
}
bufferedValues.clear()
}
}
}
override fun onNext(value: T) {
if (done) {
return
}
if (isIdle) {
if (bufferedValues.size == bufferSize) {
bufferedValues.poll()
}
bufferedValues.offer(value)
} else {
emitter.onNext(value)
}
}
override fun onError(error: Throwable) {
if (done) {
handleReaktiveError(error)
return
}
done = true
compositeDisposable.dispose()
emitter.onError(error)
}
override fun onComplete() {
if (done) {
return
}
done = true
compositeDisposable.dispose()
emitter.onComplete()
}
}
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment