Skip to content

Instantly share code, notes, and snippets.

@PatilShreyas
Last active October 22, 2023 07:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PatilShreyas/36c55d694dbd0fee559e0b85c677578f to your computer and use it in GitHub Desktop.
Save PatilShreyas/36c55d694dbd0fee559e0b85c677578f to your computer and use it in GitHub Desktop.
suspend fun collect(collector: FlowCollector<List<T>>) = coroutineScope<Unit> {
// For storing un-emitted values
val values = mutableListOf<T>()
+ var isFlowCompleted = false
// Continue looping after intervals `duration` and emit the items in the collector
// and clear the existing items from the `values`.
launch {
while (true) {
delay(duration)
+ // If the upstream flow has been completed and there are no values
+ // pending to emit in the collector, just break this loop.
+ if (isFlowCompleted && values.isEmpty()) {
+ break
+ }
collector.emit(values.toList())
values.clear()
}
}
// Collect the upstream flow and add the items to the above `values` list
upstream.collect { ... }
// If we reach here it means the upstream flow has been completed and won't
// produce any values anymore. So set the flag as flow is completed so that
// child coroutine will break its loop
+ isFlowCompleted = true
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment