Skip to content

Instantly share code, notes, and snippets.

@gaplo917
Created November 27, 2022 01:36
Show Gist options
  • Save gaplo917/f56d5d04ee7e94cd83aa4063cc4bb654 to your computer and use it in GitHub Desktop.
Save gaplo917/f56d5d04ee7e94cd83aa4063cc4bb654 to your computer and use it in GitHub Desktop.
Concurrent Stream Processing is easy in Kotlin using ChannelFlow, https://t.me/gaplotechd/24265
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class ItemDetails(val data: Any)
val current by lazy { System.currentTimeMillis() }
val timeDiff: Long get() = System.currentTimeMillis() - current
fun logWithElapseTime(s: String) = println("Elapsed ${timeDiff}ms - $s")
suspend fun getItemUrls(url: String): List<String> {
100L.also {
logWithElapseTime("[A1] before IO, url=$url, expected delay=$it")
delay(it)
logWithElapseTime("[A2] after IO, url=$url, expected delay=$it")
}
return Array(3) { "$it" }.toList()
}
suspend fun getItemDetailUrl(id: Int, url: String): String {
val delay = (5000L - id * 1000L).also {
logWithElapseTime("[B1] before IO, url=$url, expected delay=$it")
delay(it)
logWithElapseTime("[B2] after IO, url=$url with delay $it")
}
return "$url+$delay"
}
suspend fun getItemDetails(url: String): ItemDetails {
100L.also {
logWithElapseTime("[C] before IO, url=$url, expected delay=$it")
delay(it)
logWithElapseTime("[C2] after IO, url=$url with delay $it")
}
return ItemDetails(Unit)
}
fun main() {
runBlocking {
channelFlow {
getItemUrls("http://domain").forEachIndexed { index, url ->
launch {
send(getItemDetailUrl(index, url))
}
}
}
.map { itemDetailUrl ->
getItemDetails(itemDetailUrl)
}
.flowOn(Dispatchers.Default)
.collect {
logWithElapseTime("Done! $it")
}
}
}
@gaplo917
Copy link
Author

You can also run it on Kotlin playground online https://pl.kotl.in/oOf0PXOyY.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment