Skip to content

Instantly share code, notes, and snippets.

@jonreeve
Last active May 27, 2021 09:53
Show Gist options
  • Save jonreeve/49e4a4826dc772002df287de90769c9c to your computer and use it in GitHub Desktop.
Save jonreeve/49e4a4826dc772002df287de90769c9c to your computer and use it in GitHub Desktop.
Turbine but Arrange-Act-Assert
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package turbinecopy
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
import kotlinx.coroutines.Dispatchers.Unconfined
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
private const val debug = true
/**
* Terminal flow operator that collects events from given flow and allows them to be validated against LATER.
*
* ```kotlin
* val collected = flowOf("one", "two").test()
* collected.check {
* assertEquals("one", expectItem())
* assertEquals("two", expectItem())
* expectComplete()
* }
* ```
*
* Based on https://github.com/cashapp/turbine but with this API change, making it compatible with Arrange-Act-Assert
* structure of tests, and making it work better testing flows that may conflate.
*
* @param timeoutMillis duration in millis each suspending function on [FlowTurbine] will wait before timing out.
*/
@ExperimentalCoroutinesApi // For start=UNDISPATCHED
suspend fun <T> Flow<T>.test(
scope: CoroutineScope,
timeoutMillis: Long = 1_000L
): FlowTurbine<T> {
val events = Channel<Event<T>>(UNLIMITED)
val collectJob = scope.launch(start = UNDISPATCHED, context = Unconfined) {
val terminalEvent = try {
if (debug) println("Collect starting!")
collect { item ->
if (debug) println("Collect got: $item")
events.send(Event.Item(item))
}
if (debug) println("Collect complete!")
Event.Complete
} catch (_: CancellationException) {
if (debug) println("Collect canceled!")
null
} catch (t: Throwable) {
if (debug) println("Collect error! $t")
Event.Error(t)
}
if (terminalEvent != null) {
events.send(terminalEvent)
}
if (debug) println("Collect closing event channel")
events.close()
}
return ChannelBasedFlowTurbine(events, collectJob, timeoutMillis) as FlowTurbine<T>
}
/**
* Represents active collection on a source [Flow] which buffers item emissions, completion,
* and/or errors as events for consuming.
*/
interface FlowTurbine<T> {
/**
* Duration that [expectItem], [expectComplete], and [expectError] will wait for an event before
* throwing a timeout exception.
*/
val timeoutMillis: Long
/**
* Cancel collecting events from the source [Flow]. Any events which have already been received
* will still need consumed using the "expect" functions.
*/
suspend fun cancel()
/**
* Cancel collecting events from the source [Flow] and ignore any events which have already
* been received. Calling this function will exit the [test] block.
*/
suspend fun cancelAndIgnoreRemainingEvents(): Nothing
/**
* Assert that there are no unconsumed events which have been received.
*
* @throws AssertionError if unconsumed events are found.
*/
fun expectNoEvents()
/**
* Assert that the next event received was an item and return it.
* If no events have been received, this function will suspend for up to [timeoutMillis].
*
* @throws AssertionError if the next event was completion or an error.
* @throws TimeoutCancellationException if no event was received in time.
*/
suspend fun expectItem(): T
/**
* Assert that the next event received was the flow completing.
* If no events have been received, this function will suspend for up to [timeoutMillis].
*
* @throws AssertionError if the next event was an item or an error.
* @throws TimeoutCancellationException if no event was received in time.
*/
suspend fun expectComplete()
/**
* Assert that the next event received was an error terminating the flow.
* If no events have been received, this function will suspend for up to [timeoutMillis].
*
* @throws AssertionError if the next event was an item or completion.
* @throws TimeoutCancellationException if no event was received in time.
*/
suspend fun expectError(): Throwable
fun ensureAllEventsConsumed()
suspend fun check(validate: suspend FlowTurbine<T>.() -> Unit) {
if (debug) println("Called check")
val ensureConsumed = try {
if (debug) println("Calling validate lambda")
this.validate()
if (debug) println("Validate lambda returning normally")
true
} catch (e: CancellationException) {
if (e !== ignoreRemainingEventsException) {
if (debug) println("Validate ignoring remaining events")
throw e
}
if (debug) println("Validate canceling $e")
false
}
if (debug) println("Calling cancel")
cancel()
if (ensureConsumed) {
ensureAllEventsConsumed()
}
}
}
private val ignoreRemainingEventsException = CancellationException("Ignore remaining events")
private sealed class Event<out T> {
object Complete : Event<Nothing>() {
override fun toString() = "Complete"
}
data class Error(val throwable: Throwable) : Event<Nothing>() {
override fun toString() = "Error(${throwable::class.simpleName})"
}
data class Item<T>(val value: T) : Event<T>() {
override fun toString() = "Item($value)"
}
}
private class ChannelBasedFlowTurbine<T>(
private val events: Channel<Event<T>>,
private val collectJob: Job,
override val timeoutMillis: Long
) : FlowTurbine<T> {
private suspend fun <T> withTimeout(body: suspend () -> T): T {
return if (timeoutMillis == 0L) {
body()
} else {
withTimeout(timeoutMillis) {
body()
}
}
}
override suspend fun cancel() {
collectJob.cancel()
}
override suspend fun cancelAndIgnoreRemainingEvents(): Nothing {
cancel()
throw ignoreRemainingEventsException
}
override fun expectNoEvents() {
val event = events.poll()
if (event != null) {
unexpectedEvent(event, "no events")
}
}
override suspend fun expectItem(): T {
val event = withTimeout {
events.receive()
}
if (event !is Event.Item<T>) {
unexpectedEvent(event, "item")
}
return event.value
}
override suspend fun expectComplete() {
val event = withTimeout {
events.receive()
}
if (event != Event.Complete) {
unexpectedEvent(event, "complete")
}
}
override suspend fun expectError(): Throwable {
val event = withTimeout {
events.receive()
}
if (event !is Event.Error) {
unexpectedEvent(event, "error")
}
return event.throwable
}
private fun unexpectedEvent(event: Event<*>, expected: String): Nothing {
val cause = (event as? Event.Error)?.throwable
throw AssertionError("Expected $expected but found $event", cause)
}
override fun ensureAllEventsConsumed() {
val unconsumed = mutableListOf<Event<T>>()
var cause: Throwable? = null
while (true) {
val event = events.poll() ?: break
unconsumed += event
if (event is Event.Error) {
check(cause == null)
cause = event.throwable
}
}
if (debug) println("Unconsumed events: $unconsumed")
if (unconsumed.isNotEmpty()) {
throw AssertionError(
buildString {
append("Unconsumed events found:")
for (event in unconsumed) {
append("\n - $event")
}
},
cause
)
}
}
}
/**
* A plain [AssertionError] working around three bugs:
*
* 1. No two-arg constructor in common (https://youtrack.jetbrains.com/issue/KT-40728).
* 2. No two-arg constructor in Java 6.
* 3. Public exceptions with public constructors have referential equality broken by coroutines.
*/
private class AssertionError(
message: String,
override val cause: Throwable?
) : kotlin.AssertionError(message)
package playground
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.runBlockingTest
import org.junit.Test
import org.junit.Assert.*
import turbinecopy.test
import java.lang.Exception
import kotlin.time.seconds
class StateFlowTest {
@Test
fun `Conflation could be avoided if Turbine's API was different`() = runBlockingTest {
// Arrange - not bound up with assert, so conflation won't happen
val stateFlow = MutableStateFlow(0)
val collected = stateFlow.test(this)
// Act
stateFlow.value = 1
// Assert
collected.check {
assertEquals(expectItem(), 0)
assertEquals(expectItem(), 1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment