Skip to content

Instantly share code, notes, and snippets.

@kingori
Last active December 2, 2017 12:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kingori/c0c2e94ae3a2ecf7fb2417f617304c0b to your computer and use it in GitHub Desktop.
Save kingori/c0c2e94ae3a2ecf7fb2417f617304c0b to your computer and use it in GitHub Desktop.
RxJava Single Test
import io.reactivex.Single
import io.reactivex.exceptions.UndeliverableException
import io.reactivex.schedulers.Schedulers
import org.junit.Assert
import org.junit.Assert.assertTrue
import org.junit.Assert.fail
import org.junit.Test
import java.util.concurrent.CancellationException
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
class RxSingleTakeUntilTest {
@Test
fun singleSuccess() {
val result = Single.just(true)
.blockingGet()
Assert.assertEquals(true, result)
}
@Test(expected = CancellationException::class)
fun expectCancellationException1() {
Single.fromCallable {
println(Thread.currentThread().name)
Thread.sleep(1000)
true
}
.takeUntil(Single.just(true))
.subscribeOn(Schedulers.newThread())
.blockingGet()
}
@Test
fun expectCancellationException2() {
val latch = CountDownLatch(1)
Single.fromCallable {
Thread.sleep(1000)
true
}
.takeUntil(Single.just(true))
.subscribeOn(Schedulers.newThread())
.subscribe({
println("success")
fail()
latch.countDown()
}, {
println("err")
it.printStackTrace()
assert(it is CancellationException)
latch.countDown()
})
latch.await()
}
@Test
fun expectUndeliveredException() {
val latch = CountDownLatch(1)
val catchException = object : Thread.UncaughtExceptionHandler {
var caught: Throwable? = null
override fun uncaughtException(thread: Thread, throwable: Throwable) {
caught = throwable
latch.countDown()
}
}
val disposable =
Single.create<Boolean> { source ->
Thread.sleep(200)
source.onSuccess(true)
}
.subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor {
Thread(it).apply {
uncaughtExceptionHandler = catchException
}
}))
.subscribe({
fail()
}, { })
disposable.dispose()
latch.await()
assertTrue(catchException.caught is UndeliverableException)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment