-
-
Save saqib-github-commits/cd2ed899e1b1dceb6bb68a9ec0e0bfc3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private class CallbackFlowBuilder<T>( | |
private val block: suspend ProducerScope<T>.() -> Unit, | |
context: CoroutineContext = EmptyCoroutineContext, | |
capacity: Int = BUFFERED, | |
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND | |
) : ChannelFlowBuilder<T>(block, context, capacity, onBufferOverflow) { | |
override suspend fun collectTo(scope: ProducerScope<T>) { | |
super.collectTo(scope) | |
/* | |
* We expect user either call `awaitClose` from within a block (then the channel is closed at this moment) | |
* or being closed/cancelled externally/manually. Otherwise "user forgot to call | |
* awaitClose and receives unhelpful ClosedSendChannelException exceptions" situation is detected. | |
*/ | |
if (!scope.isClosedForSend) { | |
throw IllegalStateException( | |
""" | |
'awaitClose { yourCallbackOrListener.cancel() }' should be used in the end of callbackFlow block. | |
Otherwise, a callback/listener may leak in case of external cancellation. | |
See callbackFlow API documentation for the details. | |
""".trimIndent() | |
) | |
} | |
} | |
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<T> = | |
CallbackFlowBuilder(block, context, capacity, onBufferOverflow) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment