Skip to content

Instantly share code, notes, and snippets.

@afollestad
Created February 1, 2020 04:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afollestad/75313f90edc5984475eda4e726e94047 to your computer and use it in GitHub Desktop.
Save afollestad/75313f90edc5984475eda4e726e94047 to your computer and use it in GitHub Desktop.
@file:Suppress("unused", "MemberVisibilityCanBePrivate")
package com.afollestad.flow.util
import com.google.common.truth.Truth.assertThat
import com.google.common.truth.Truth.assertWithMessage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
@ExperimentalCoroutinesApi
class TestCollector<T : Any> internal constructor(
flow: Flow<T>
) {
private val job = Job()
private val context = CoroutineScope(Unconfined + job)
private val actual = mutableListOf<T>()
@PublishedApi internal var error: Throwable? = null
init {
flow.onEach { actual.add(it) }
.catch { error = it }
.onCompletion { job.cancel() }
.launchIn(context)
}
fun assertValues(vararg expected: T) = apply {
assertThat(expected.toList()).isEqualTo(actual)
}
fun assertValuesAndClear(vararg expected: T) = apply {
assertValues(*expected)
clear()
}
fun clear() = apply { actual.clear() }
fun assertNoValues() = apply {
assertWithMessage("Flow is not empty")
.that(actual)
.isEmpty()
}
inline fun <reified T : Throwable> assertError(
expectedMessage: String? = null
) = apply {
assertThat(error).isNotNull()
with(error!!) {
assertThat(this).isInstanceOf(T::class.java)
message?.let { assertThat(it).isEqualTo(expectedMessage) }
}
}
fun assertNoError() = apply {
assertWithMessage("An error occurred in the stream")
.that(error)
.isNull()
}
fun assertNotComplete() = apply {
assertWithMessage("Flow is complete")
.that(job.isActive)
.isTrue()
}
fun assertComplete() = apply {
assertWithMessage("Flow is not complete")
.that(job.isActive)
.isFalse()
}
fun cancel() = job.cancel()
}
@ExperimentalCoroutinesApi fun <T : Any> Flow<T>.test(): TestCollector<T> {
return TestCollector(this)
}
package com.afollestad.flow.util
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.runBlocking
import org.junit.Test
@FlowPreview
@ExperimentalCoroutinesApi
class TestCollectorTest {
@Test fun `assert values`() {
flowOf(1, 2, 3)
.test()
.assertValues(1, 2, 3)
.assertNoError()
.assertComplete()
}
@Test fun `assert values and clear`() = runBlocking<Unit> {
val channel = BroadcastChannel<Int>(BUFFERED)
val collector = channel.asFlow()
.test()
channel.send(1)
collector
.assertNotComplete()
.assertNoError()
.assertValuesAndClear(1)
channel.send(2)
collector
.assertNotComplete()
.assertNoError()
.assertValuesAndClear(2)
channel.cancel()
collector.assertComplete()
}
@Test fun `assert no values`() {
emptyFlow<Int>()
.test()
.assertComplete()
.assertNoError()
.assertNoValues()
}
@Test fun `assert error`() {
val flow: Flow<Int> = flow {
emit(1)
error("Oh no!")
}
flow.test()
.assertValues(1)
.assertError<IllegalStateException>("Oh no!")
.assertComplete()
}
@Test fun clear() {
flowOf(1, 2, 3)
.test()
.clear()
.assertNoValues()
.assertNoError()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment