Skip to content

Instantly share code, notes, and snippets.

@TimoPtr
Last active July 16, 2018 08:33
Show Gist options
  • Save TimoPtr/e50e1bc292d4963ac6a9b79a3d44bddc to your computer and use it in GitHub Desktop.
Save TimoPtr/e50e1bc292d4963ac6a9b79a3d44bddc to your computer and use it in GitHub Desktop.
Debounce Buffer with guard RXJava
import io.reactivex.Observable
import io.reactivex.subjects.PublishSubject
import org.junit.Assert.assertEquals
import org.junit.Test
import java.util.concurrent.TimeUnit
/**
* Rx sample demonstration of combination of operators
*/
class RxSampleTest {
@Test
fun debounceBufferGuard() {
val subject = PublishSubject.create<ArrayList<String>>()
// Create source
val share = subject.share()
// define a debounce observable which trigger the flush of the buffer
val boundary = share.map { true }.debounce(1, TimeUnit.MILLISECONDS)
// add timeout with repeat to limit the size of the buffer if the source is faster than the debounce
.timeout(20, TimeUnit.MILLISECONDS, Observable.just(false)).repeat().doOnNext { println(it) }
// buffer of source
val testObs = share.buffer(boundary)
.filter(List<ArrayList<String>>::isNotEmpty) // Avoid empty buffer trigger by boundary when the source doesn't emit
.doOnNext {
println("$it")
//reduce (flatten)
println(it.reduce { acc, list ->
acc.addAll(list)
acc
}.size)
}.test()
subject.onNext(arrayListOf("Hello", "World"))
subject.onNext(arrayListOf("Good", "bye"))
Thread.sleep(2)
subject.onNext(arrayListOf("Welcome", "back"))
Thread.sleep(2)
subject.onNext(arrayListOf("End"))
(0 until 1000).forEach {
subject.onNext(arrayListOf("End"))
}
subject.onComplete()
testObs.await()
testObs.assertNoErrors()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment