Created
April 16, 2019 06:24
-
-
Save uzzu/8bcb377d65d9cf71b5bd006438eca0f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import androidx.annotation.VisibleForTesting | |
import io.reactivex.Observable | |
import io.reactivex.Single | |
class RxSingleHot { | |
private data class SharedObservable( | |
val id: Int, | |
val observable: Observable<*> | |
) | |
private val lock: Any = Any() | |
private val map: MutableMap<String, SharedObservable> = mutableMapOf() | |
private var serial: Int = 0 | |
fun <T> getOrCreate(key: String, factory: () -> Single<T>): Single<T> = synchronized(lock) { | |
if (map.containsKey(key)) { | |
@Suppress("unchecked_cast") | |
val cached = checkNotNull(map[key]).observable as Observable<T> | |
return@synchronized cached | |
} | |
map.remove(key) | |
val id = ++serial | |
val created = map.getOrPut(key) { | |
SharedObservable( | |
id, | |
factory() | |
.toObservable() | |
.doOnEach { println("Original source event: $it") } | |
.doOnTerminate { println("terminate"); removeBy(key, id) } | |
.doOnDispose { println("dispose"); removeBy(key, id) } | |
.share() | |
) | |
} | |
@Suppress("unchecked_cast") | |
return@synchronized created.observable as Observable<T> | |
}.lastOrError().doOnSuccess { println("Downstream source success[$it]") } | |
private fun removeBy(key: String, id: Int) { | |
synchronized(lock) { | |
if (!map.containsKey(key)) { | |
return | |
} | |
val shared = checkNotNull(map[key]) | |
if (shared.id != id) { | |
return | |
} | |
map.remove(key) | |
} | |
} | |
// region VisibleForTesting | |
@VisibleForTesting | |
internal val size | |
get() = map.size | |
@VisibleForTesting | |
internal fun isEmpty() = map.isEmpty() | |
// endregion | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.truth.Expect | |
import io.reactivex.Observable | |
import io.reactivex.Single | |
import org.junit.Rule | |
import org.junit.Test | |
import java.util.concurrent.TimeUnit | |
class RxSingleHotTest { | |
@Rule | |
@JvmField | |
var expect = Expect.create() | |
private fun calc(value: Int) = | |
Observable | |
.timer(10, TimeUnit.MILLISECONDS) | |
.map { value * value } | |
.singleOrError() | |
@Test | |
fun basicHotStream() { | |
// Given | |
val times = 3 | |
val withHot = RxSingleHot() | |
val singles = mutableListOf<Single<Int>>() | |
// precondition | |
repeat(times) { singles.add(withHot.getOrCreate("hot") { calc(it) }) } | |
expect.that(withHot.size).isEqualTo(1) | |
// When | |
val observers = singles.map { it.test() } | |
observers.forEach { it.await() } | |
// Then | |
observers.forEach { it.assertComplete() } | |
observers.forEach { it.assertNoErrors() } | |
observers.forEach { it.assertValue(0) } | |
expect.that(observers.map { it.values()[0] }.distinct().size).isEqualTo(1) | |
expect.that(withHot.isEmpty()).isTrue() | |
} | |
@Test | |
fun differentKeys() { | |
// Given | |
val times = 3 | |
val withHot = RxSingleHot() | |
val singles = mutableListOf<Single<Int>>() | |
// precondition | |
repeat(times) { singles.add(withHot.getOrCreate("hot$it") { calc(it) }) } | |
expect.that(withHot.size).isEqualTo(times) | |
// When | |
val observers = singles.map { it.test() } | |
observers.forEach { it.await() } | |
// Then | |
observers.forEach { it.assertComplete() } | |
observers.forEach { it.assertNoErrors() } | |
observers.forEachIndexed { i, it -> it.assertValue(i * i) } | |
expect.that(observers.map { it.values()[0] }.distinct().size).isEqualTo(times) | |
expect.that(withHot.isEmpty()).isTrue() | |
} | |
@Test | |
fun removeSharedObservableBeforeEmitValue() { | |
val withHot = RxSingleHot() | |
// precondition | |
val single = withHot.getOrCreate("hot") { calc(100) } | |
// Then | |
.doOnSuccess { expect.that(withHot.isEmpty()).isTrue() } | |
expect.that(withHot.size).isEqualTo(1) | |
// When | |
single.test().await().assertValue(100 * 100) | |
} | |
@Test | |
fun removeSharedObservableIfDisposed() { | |
val withHot = RxSingleHot() | |
// precondition | |
val single = withHot.getOrCreate("hot") { calc(100) } | |
expect.that(withHot.size).isEqualTo(1) | |
// When | |
val observer = single.test() | |
observer.dispose() | |
expect.that(withHot.isEmpty()).isTrue() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original source event: OnNextNotification[0] | |
Original source event: OnCompleteNotification | |
terminate | |
Downstream source success[0] | |
Downstream source success[0] | |
Downstream source success[0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment