Skip to content

Instantly share code, notes, and snippets.

@gaplo917
Created November 27, 2022 01:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gaplo917/f56d5d04ee7e94cd83aa4063cc4bb654 to your computer and use it in GitHub Desktop.
Save gaplo917/f56d5d04ee7e94cd83aa4063cc4bb654 to your computer and use it in GitHub Desktop.
Concurrent Stream Processing is easy in Kotlin using ChannelFlow, https://t.me/gaplotechd/24265
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
data class ItemDetails(val data: Any)
val current by lazy { System.currentTimeMillis() }
val timeDiff: Long get() = System.currentTimeMillis() - current
fun logWithElapseTime(s: String) = println("Elapsed ${timeDiff}ms - $s")
suspend fun getItemUrls(url: String): List<String> {
100L.also {
logWithElapseTime("[A1] before IO, url=$url, expected delay=$it")
delay(it)
logWithElapseTime("[A2] after IO, url=$url, expected delay=$it")
}
return Array(3) { "$it" }.toList()
}
suspend fun getItemDetailUrl(id: Int, url: String): String {
val delay = (5000L - id * 1000L).also {
logWithElapseTime("[B1] before IO, url=$url, expected delay=$it")
delay(it)
logWithElapseTime("[B2] after IO, url=$url with delay $it")
}
return "$url+$delay"
}
suspend fun getItemDetails(url: String): ItemDetails {
100L.also {
logWithElapseTime("[C] before IO, url=$url, expected delay=$it")
delay(it)
logWithElapseTime("[C2] after IO, url=$url with delay $it")
}
return ItemDetails(Unit)
}
fun main() {
runBlocking {
channelFlow {
getItemUrls("http://domain").forEachIndexed { index, url ->
launch {
send(getItemDetailUrl(index, url))
}
}
}
.map { itemDetailUrl ->
getItemDetails(itemDetailUrl)
}
.flowOn(Dispatchers.Default)
.collect {
logWithElapseTime("Done! $it")
}
}
}
@gaplo917
Copy link
Author

Expected output

Elapsed 0ms - [A1] before IO, url=http://domain, expected delay=100
Elapsed 107ms - [A2] after IO, url=http://domain, expected delay=100
Elapsed 116ms - [B1] before IO, url=0, expected delay=5000
Elapsed 116ms - [B1] before IO, url=1, expected delay=4000
Elapsed 117ms - [B1] before IO, url=2, expected delay=3000
Elapsed 3121ms - [B2] after IO, url=2 with delay 3000
Elapsed 3127ms - [C] before IO, url=2+3000, expected delay=100
Elapsed 3229ms - [C2] after IO, url=2+3000 with delay 100
Elapsed 3235ms - Done! ItemDetails(data=kotlin.Unit)
Elapsed 4117ms - [B2] after IO, url=1 with delay 4000
Elapsed 4117ms - [C] before IO, url=1+4000, expected delay=100
Elapsed 4217ms - [C2] after IO, url=1+4000 with delay 100
Elapsed 4218ms - Done! ItemDetails(data=kotlin.Unit)
Elapsed 5122ms - [B2] after IO, url=0 with delay 5000
Elapsed 5122ms - [C] before IO, url=0+5000, expected delay=100
Elapsed 5223ms - [C2] after IO, url=0+5000 with delay 100
Elapsed 5223ms - Done! ItemDetails(data=kotlin.Unit)

@gaplo917
Copy link
Author

You can also run it on Kotlin playground online https://pl.kotl.in/oOf0PXOyY.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment