Skip to content

Instantly share code, notes, and snippets.

@pablobaxter
Created March 17, 2021 05:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pablobaxter/50cb9873f38bf6a0e2d72c769ac112f3 to your computer and use it in GitHub Desktop.
Save pablobaxter/50cb9873f38bf6a0e2d72c769ac112f3 to your computer and use it in GitHub Desktop.
Flow timeout on the producer
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import java.util.concurrent.TimeoutException
fun <T> Flow<T>.withTimeout(timeoutMillis: Long): Flow<T> {
return flow { // New flow to wrap the old one
coroutineScope {
require(timeoutMillis >= 0L) { "timeout should not be negative" } // Obviously...
// Using a Channel to collect each flow emission. We don't use a buffer and the buffer overflow strategy is to suspend.
val values = produce {
var lastTimestamp = System.currentTimeMillis() // Time just before collecting...
try {
collect { // Collect on the producer flow
// Measure current time with last time, and send special signal for timeout
if (System.currentTimeMillis() - lastTimestamp > timeoutMillis) {
send(TIMEOUT)
} else {
send(it ?: NULL) // Either the item or a special signal for NULL. Similar to internal coroutine code
}
// We reset the timestamp here! The reason being is that the `flow.emit()` suspends, which in turn suspends `send()`.
// We only want to measure a timeout if the producer took longer than `timeoutMillis`, not producer + consumer
lastTimestamp = System.currentTimeMillis()
}
} finally {
send(DONE) // Special signal to let flow end
}
}
// Await for values from our producer now...
var lastValue = values.receive()
while (lastValue !== DONE && !values.isClosedForReceive) {
// If we got a timeout signal, throw the TimeoutException
if (lastValue === TIMEOUT) throw TimeoutException("Producer timed out")
emit(NULL.unbox(lastValue)) // Emit the value from the producer
lastValue = values.receive() // Await the next value...
}
}
}
}
// Giving credit where it's due...
// Basically ripped from: https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/internal/Symbol.kt
sealed class Symbol {
inline fun <T> unbox(value: Any?): T = if (value === this) null as T else value as T
}
object TIMEOUT: Symbol()
object DONE: Symbol()
object NULL: Symbol()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment