Skip to content

Instantly share code, notes, and snippets.

@uzzu
Created April 16, 2019 06:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save uzzu/8bcb377d65d9cf71b5bd006438eca0f9 to your computer and use it in GitHub Desktop.
Save uzzu/8bcb377d65d9cf71b5bd006438eca0f9 to your computer and use it in GitHub Desktop.
import androidx.annotation.VisibleForTesting
import io.reactivex.Observable
import io.reactivex.Single
class RxSingleHot {
private data class SharedObservable(
val id: Int,
val observable: Observable<*>
)
private val lock: Any = Any()
private val map: MutableMap<String, SharedObservable> = mutableMapOf()
private var serial: Int = 0
fun <T> getOrCreate(key: String, factory: () -> Single<T>): Single<T> = synchronized(lock) {
if (map.containsKey(key)) {
@Suppress("unchecked_cast")
val cached = checkNotNull(map[key]).observable as Observable<T>
return@synchronized cached
}
map.remove(key)
val id = ++serial
val created = map.getOrPut(key) {
SharedObservable(
id,
factory()
.toObservable()
.doOnEach { println("Original source event: $it") }
.doOnTerminate { println("terminate"); removeBy(key, id) }
.doOnDispose { println("dispose"); removeBy(key, id) }
.share()
)
}
@Suppress("unchecked_cast")
return@synchronized created.observable as Observable<T>
}.lastOrError().doOnSuccess { println("Downstream source success[$it]") }
private fun removeBy(key: String, id: Int) {
synchronized(lock) {
if (!map.containsKey(key)) {
return
}
val shared = checkNotNull(map[key])
if (shared.id != id) {
return
}
map.remove(key)
}
}
// region VisibleForTesting
@VisibleForTesting
internal val size
get() = map.size
@VisibleForTesting
internal fun isEmpty() = map.isEmpty()
// endregion
}
import com.google.common.truth.Expect
import io.reactivex.Observable
import io.reactivex.Single
import org.junit.Rule
import org.junit.Test
import java.util.concurrent.TimeUnit
class RxSingleHotTest {
@Rule
@JvmField
var expect = Expect.create()
private fun calc(value: Int) =
Observable
.timer(10, TimeUnit.MILLISECONDS)
.map { value * value }
.singleOrError()
@Test
fun basicHotStream() {
// Given
val times = 3
val withHot = RxSingleHot()
val singles = mutableListOf<Single<Int>>()
// precondition
repeat(times) { singles.add(withHot.getOrCreate("hot") { calc(it) }) }
expect.that(withHot.size).isEqualTo(1)
// When
val observers = singles.map { it.test() }
observers.forEach { it.await() }
// Then
observers.forEach { it.assertComplete() }
observers.forEach { it.assertNoErrors() }
observers.forEach { it.assertValue(0) }
expect.that(observers.map { it.values()[0] }.distinct().size).isEqualTo(1)
expect.that(withHot.isEmpty()).isTrue()
}
@Test
fun differentKeys() {
// Given
val times = 3
val withHot = RxSingleHot()
val singles = mutableListOf<Single<Int>>()
// precondition
repeat(times) { singles.add(withHot.getOrCreate("hot$it") { calc(it) }) }
expect.that(withHot.size).isEqualTo(times)
// When
val observers = singles.map { it.test() }
observers.forEach { it.await() }
// Then
observers.forEach { it.assertComplete() }
observers.forEach { it.assertNoErrors() }
observers.forEachIndexed { i, it -> it.assertValue(i * i) }
expect.that(observers.map { it.values()[0] }.distinct().size).isEqualTo(times)
expect.that(withHot.isEmpty()).isTrue()
}
@Test
fun removeSharedObservableBeforeEmitValue() {
val withHot = RxSingleHot()
// precondition
val single = withHot.getOrCreate("hot") { calc(100) }
// Then
.doOnSuccess { expect.that(withHot.isEmpty()).isTrue() }
expect.that(withHot.size).isEqualTo(1)
// When
single.test().await().assertValue(100 * 100)
}
@Test
fun removeSharedObservableIfDisposed() {
val withHot = RxSingleHot()
// precondition
val single = withHot.getOrCreate("hot") { calc(100) }
expect.that(withHot.size).isEqualTo(1)
// When
val observer = single.test()
observer.dispose()
expect.that(withHot.isEmpty()).isTrue()
}
}
Original source event: OnNextNotification[0]
Original source event: OnCompleteNotification
terminate
Downstream source success[0]
Downstream source success[0]
Downstream source success[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment