Skip to content

Instantly share code, notes, and snippets.

@LouisCAD
Last active December 13, 2022 16:05
Show Gist options
  • Save LouisCAD/93a6e14a91b950a0a91796ab957836b5 to your computer and use it in GitHub Desktop.
Save LouisCAD/93a6e14a91b950a0a91796ab957836b5 to your computer and use it in GitHub Desktop.
Collect a Flow in parallel
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
suspend fun <T> Flow<T>.collectInParallel(
concurrency: Int,
collector: suspend (T) -> Unit
) = coroutineScope {
val channel: Channel<T> = Channel()
repeat(concurrency) { launch { for (e in channel) collector(e) } }
collect { channel.send(it) }
channel.close()
}
suspend fun <T> Flow<T>.collectInParallel(
collectors: List<suspend (T) -> Unit>
) = coroutineScope {
val channel: Channel<T> = Channel()
collectors.forEach { collector -> launch { for (e in channel) collector(e) } }
collect { channel.send(it) }
channel.close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment