Skip to content

Instantly share code, notes, and snippets.

@octylFractal
Last active March 8, 2020 22:25
Show Gist options
  • Save octylFractal/7e07659b526460e4e041b777015bab23 to your computer and use it in GitHub Desktop.
Save octylFractal/7e07659b526460e4e041b777015bab23 to your computer and use it in GitHub Desktop.
flatMapSequential idea
package kotlin_playground
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.produceIn
inline fun <T, R> Flow<T>.flatMapSequential(crossinline toFlow: suspend (T) -> Flow<R>): Flow<R> {
return map(toFlow).flattenSequential()
}
@OptIn(ExperimentalCoroutinesApi::class, FlowPreview::class)
fun <T> Flow<Flow<T>>.flattenSequential(): Flow<T> {
val source = this
return flow {
coroutineScope {
val channel = produce<ReceiveChannel<T>>(capacity = Channel.UNLIMITED) {
source.collect {
channel.send(it.buffer(capacity = Channel.UNLIMITED).produceIn(this@coroutineScope))
}
}
for (items in channel) {
emitAll(items)
}
}
}
}
package kotlin_playground
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import java.util.concurrent.TimeUnit
suspend fun main() {
// Expected: a0, a1000, b500, b2000
val start = System.nanoTime()
flowOf(Waits("a", 0, 1000), Waits("b", 500, 1500))
.flatMapSequential { waits ->
flow {
for (wait in waits.waits) {
delay(wait.toLong())
emit(waits.tag + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start))
}
}
}
.collect {
System.err.println(it)
}
}
class Waits(val tag: String, vararg val waits: Int)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment