Skip to content

Instantly share code, notes, and snippets.

@jzallas
Created October 9, 2018 03:16
Show Gist options
  • Save jzallas/0a0c3f54c086d91a86238d3e8076fde0 to your computer and use it in GitHub Desktop.
Save jzallas/0a0c3f54c086d91a86238d3e8076fde0 to your computer and use it in GitHub Desktop.
Writing assertions against executing rxjava2 schedulers in kotlin
package com.example.test
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.Schedulers
import io.reactivex.subjects.BehaviorSubject
import org.junit.Assert.*
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
class SchedulerExecutionTests {
lateinit var viewModel: ViewModel
lateinit var mainScheduler: ControlledScheduler
lateinit var ioScheduler: ControlledScheduler
lateinit var testObserver: NamedTestObserver<String>
@Before
fun setup() {
mainScheduler = ControlledScheduler("TEST_MAIN")
ioScheduler = ControlledScheduler("TEST_IO")
viewModel = ViewModel(mainScheduler, ioScheduler)
testObserver = viewModel.stringObservable.scheduledTest()
}
@Test
fun `when perform work is called then two values are emitted`() {
viewModel.performWork()
testObserver.assertScheduled(mainScheduler)
.assertValueCount(2)
.assertValues("test1", "test2")
assertNotEquals(Thread.currentThread().name, "TEST_IO")
assertNotEquals(Thread.currentThread().name, "TEST_MAIN")
}
fun <T> Observable<T>.scheduledTest(): NamedTestObserver<T> {
val to: NamedTestObserver<T> = NamedTestObserver(TestObserver())
subscribe(to)
return to
}
class ViewModel(
val mainScheduler: Scheduler,
val ioScheduler: Scheduler
) {
val stringObservable: BehaviorSubject<String> = BehaviorSubject.create()
fun performWork() {
Observable.fromArray("test1", "test2")
.subscribeOn(ioScheduler)
.observeOn(mainScheduler)
.doOnNext { stringObservable.onNext(it) }
.subscribe()
}
}
class NamedTestObserver<T>(private val testObserver: TestObserver<T>) : Observer<T> by testObserver {
fun assertScheduled(scheduler: ControlledScheduler): TestObserver<T> {
assertNotNull(lastKnownThreadName)
assertEquals(scheduler.name, lastKnownThreadName)
return testObserver
}
var lastKnownThreadName: String? = null
override fun onSubscribe(d: Disposable) {
lastKnownThreadName = Thread.currentThread().name
testObserver.onSubscribe(d)
}
override fun onNext(t: T) {
lastKnownThreadName = Thread.currentThread().name
testObserver.onNext(t)
}
override fun onComplete() {
lastKnownThreadName = Thread.currentThread().name
testObserver.onComplete()
}
override fun onError(e: Throwable) {
lastKnownThreadName = Thread.currentThread().name
testObserver.onError(e)
}
}
class ControlledScheduler(
val name: String,
private val scheduler: Scheduler = Schedulers.trampoline()
) : Scheduler() {
override fun createWorker(): Worker {
return ControlledWorker(scheduler.createWorker())
}
inner class ControlledWorker(private val worker: Worker) : Worker() {
override fun isDisposed(): Boolean {
return worker.isDisposed
}
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
return worker.schedule(NamedRunnable(name, run), delay, unit)
}
override fun dispose() {
return worker.dispose()
}
}
class NamedRunnable(
private val name: String,
private val runnable: Runnable
) : Runnable {
override fun run() {
val previousName = Thread.currentThread().name
Thread.currentThread().name = name
runnable.run()
Thread.currentThread().name = previousName
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment