Instantly share code, notes, and snippets.

Embed
What would you like to do?
Test and prototype for a more specific back pressure in rxjava 2
package com.jaynewstrom.rx
import io.reactivex.Flowable
import io.reactivex.Scheduler
import io.reactivex.schedulers.Schedulers
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
class RxBackPressure {
@Test fun yay() {
val atomicCount = AtomicLong()
val latch = CountDownLatch(500)
Flowable.interval(0, 50, TimeUnit.MILLISECONDS)
.map {
latch.countDown()
atomicCount.incrementAndGet()
}
.takeOnlyMostRecentEvents(Schedulers.io(), 50, TimeUnit.MILLISECONDS)
.map { count ->
if (count % 3 == 0L) {
Thread.sleep(30)
} else if (count % 2 == 0L) {
Thread.sleep(80)
} else {
Thread.sleep(200)
}
count
}.subscribe {
println("At position: $it - at: ${System.currentTimeMillis()} - on: ${Thread.currentThread().name}")
}
latch.await(1, TimeUnit.MINUTES)
}
}
fun <T> Flowable<T>.takeOnlyMostRecentEvents(
scheduler: Scheduler,
sourceEmisionInterval: Long,
sourceEmissionIntervalUnit: TimeUnit
): Flowable<T> {
val sourceEmissionIntervalMillis = TimeUnit.MILLISECONDS.convert(sourceEmisionInterval, sourceEmissionIntervalUnit)
return onBackpressureDrop()
.map { sourceEvent -> DataHolder(sourceEvent) }
.observeOn(scheduler)
.filter { it.isCurrent(sourceEmissionIntervalMillis) }
.map { it.sourceEvent }
}
private class DataHolder<T>(val sourceEvent: T, val emittedMillis: Long = System.currentTimeMillis()) {
fun isCurrent(sourceEmissionIntervalMillis: Long): Boolean {
val diff = System.currentTimeMillis() - emittedMillis
return diff < sourceEmissionIntervalMillis
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment