Skip to content

Instantly share code, notes, and snippets.

@saqib-github-commits
Created April 25, 2023 08:15
Show Gist options
  • Save saqib-github-commits/cd2ed899e1b1dceb6bb68a9ec0e0bfc3 to your computer and use it in GitHub Desktop.
Save saqib-github-commits/cd2ed899e1b1dceb6bb68a9ec0e0bfc3 to your computer and use it in GitHub Desktop.
private class CallbackFlowBuilder<T>(
private val block: suspend ProducerScope<T>.() -> Unit,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) {
override suspend fun collectTo(scope: ProducerScope<T>) {
super.collectTo(scope)
/*
* We expect user either call `awaitClose` from within a block (then the channel is closed at this moment)
* or being closed/cancelled externally/manually. Otherwise "user forgot to call
* awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected.
*/
if (!scope.isClosedForSend) {
throw IllegalStateException(
"""
'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block.
Otherwise, a callback/listener may leak in case of external cancellation.
See callbackFlow API documentation for the details.
""".trimIndent()
)
}
}
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> =
CallbackFlowBuilder(block, context, capacity, onBufferOverflow)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment