Created
March 17, 2021 05:59
-
-
Save pablobaxter/50cb9873f38bf6a0e2d72c769ac112f3 to your computer and use it in GitHub Desktop.
Flow timeout on the producer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.channels.produce | |
import kotlinx.coroutines.coroutineScope | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.collect | |
import kotlinx.coroutines.flow.flow | |
import java.util.concurrent.TimeoutException | |
fun <T> Flow<T>.withTimeout(timeoutMillis: Long): Flow<T> { | |
return flow { // New flow to wrap the old one | |
coroutineScope { | |
require(timeoutMillis >= 0L) { "timeout should not be negative" } // Obviously... | |
// Using a Channel to collect each flow emission. We don't use a buffer and the buffer overflow strategy is to suspend. | |
val values = produce { | |
var lastTimestamp = System.currentTimeMillis() // Time just before collecting... | |
try { | |
collect { // Collect on the producer flow | |
// Measure current time with last time, and send special signal for timeout | |
if (System.currentTimeMillis() - lastTimestamp > timeoutMillis) { | |
send(TIMEOUT) | |
} else { | |
send(it ?: NULL) // Either the item or a special signal for NULL. Similar to internal coroutine code | |
} | |
// We reset the timestamp here! The reason being is that the `flow.emit()` suspends, which in turn suspends `send()`. | |
// We only want to measure a timeout if the producer took longer than `timeoutMillis`, not producer + consumer | |
lastTimestamp = System.currentTimeMillis() | |
} | |
} finally { | |
send(DONE) // Special signal to let flow end | |
} | |
} | |
// Await for values from our producer now... | |
var lastValue = values.receive() | |
while (lastValue !== DONE && !values.isClosedForReceive) { | |
// If we got a timeout signal, throw the TimeoutException | |
if (lastValue === TIMEOUT) throw TimeoutException("Producer timed out") | |
emit(NULL.unbox(lastValue)) // Emit the value from the producer | |
lastValue = values.receive() // Await the next value... | |
} | |
} | |
} | |
} | |
// Giving credit where it's due... | |
// Basically ripped from: https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/internal/Symbol.kt | |
sealed class Symbol { | |
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T | |
} | |
object TIMEOUT: Symbol() | |
object DONE: Symbol() | |
object NULL: Symbol() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment