Skip to content

Instantly share code, notes, and snippets.

@steliosfran
Created August 1, 2021 12:45
Show Gist options
  • Save steliosfran/c73562c6cd40ea95957672207b3e9bae to your computer and use it in GitHub Desktop.
Save steliosfran/c73562c6cd40ea95957672207b3e9bae to your computer and use it in GitHub Desktop.
import io.reactivex.rxjava3.core.Completable
import io.reactivex.rxjava3.plugins.RxJavaPlugins
import io.reactivex.rxjava3.schedulers.Schedulers
import net.jodah.concurrentunit.Waiter
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import java.util.Timer
import kotlin.concurrent.schedule
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@DisplayName("Given we merge two streams")
class SafeMergeArrayTest {
@Nested
@DisplayName("When the first stream throws an error")
inner class FirstStreamError {
private val completable1 = Completable.error(
Throwable("error 1")
).subscribeOn(Schedulers.io())
@Nested
@DisplayName("When the second stream succeeds")
inner class SecondStreamSuccess {
private val completable2 = Completable.complete()
.subscribeOn(Schedulers.io())
@Test
fun `then the merged stream throws an error`() {
safeMergeArray(completable1, completable2).test().await().assertError {
it.message == "error 1"
}
}
}
@Nested
@DisplayName("When the second stream throws an error")
inner class SecondStreamError {
private val completable2 = Completable.error(
Throwable("error 2")
).subscribeOn(Schedulers.io())
@Test
fun `then the merged stream throws an error`() {
safeMergeArray(completable1, completable2).test().await().assertError {
it.message == "error 1"
}
}
}
@Nested
@DisplayName("When the second stream throws an error simultaneously")
inner class SecondStreamErrorSimultaneously {
private val completable2 = Completable.error(
Throwable("error 2")
).subscribeOn(Schedulers.io())
@Test
fun `then the merged stream does not crash`() {
val waiter = Waiter()
RxJavaPlugins.setErrorHandler {
// Fail the test if we receive an exception in the error handler
waiter.fail("Error handler received an exception: $it")
}
safeMergeArray(completable1, completable2).test().await()
Timer().schedule(10) {
// Pass the test after 10ms if we don't receive an exception
waiter.resume()
}
// Await for waiter.resume() or waiter.fail()
waiter.await()
}
}
}
@Nested
@DisplayName("When the first stream succeeds")
inner class FirstStreamSuccess {
private val completable1 = Completable.complete()
.subscribeOn(Schedulers.io())
@Nested
@DisplayName("When the second stream succeeds")
inner class SecondStreamSuccess {
private val completable2 = Completable.complete()
.subscribeOn(Schedulers.io())
@Test
fun `then the merged stream succeeds`() {
safeMergeArray(completable1, completable2).test().await().assertComplete()
}
}
@Nested
@DisplayName("When the second stream throws an error")
inner class SecondStreamError {
private val completable2 = Completable.error(
Throwable("error 2")
).subscribeOn(Schedulers.io())
@Test
fun `then the merged stream throws an error`() {
safeMergeArray(completable1, completable2).test().await().assertError {
it.message == "error 2"
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment