Skip to content

Instantly share code, notes, and snippets.

@bartoszm
Created March 23, 2023 09:43
Show Gist options
  • Save bartoszm/04cd02a30610950e61064582aaeff3d7 to your computer and use it in GitHub Desktop.
Save bartoszm/04cd02a30610950e61064582aaeff3d7 to your computer and use it in GitHub Desktop.
Example of controling execution parallerisms with kotlin flows
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.util.concurrent.Executors
import kotlin.system.measureTimeMillis
@OptIn(FlowPreview::class)
fun main() = runBlocking {
val urls = (1..1000).map { "https://example.com/todos/$it" }
val ioThreadPool = Executors.newCachedThreadPool().asCoroutineDispatcher()
for(mcr in (3..20).step(2)) {
val timeMs = measureTimeMillis {
val result = urls.asFlow()
.flatMapMerge(mcr) { url ->
flowOf(url).flowOn(ioThreadPool)
.map {
queryRestEndpoint(it)
}
}
.toList()
println("Visited ${result.size}")
}
println("Total time $timeMs for concurrency: $mcr")
}
ioThreadPool.close()
}
suspend fun queryRestEndpoint(url: String): String {
delay(100L)
return url
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment