Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
fun log(message: String) {
println("[${Thread.currentThread().name}] : $message")
}
class MyRepository {
fun getList(): List<Int> = (1..20).toList()
}
class MyComponent(private val myRepository: MyRepository): CoroutineScope by CoroutineScope(Dispatchers.IO) {
private val _mySharedFlow = MutableSharedFlow<Int>(2, 3, BufferOverflow.DROP_OLDEST)
val mySharedFlow: SharedFlow<Int>
get() = _mySharedFlow
fun fetchAndEmitList() = launch {
myRepository.getList().forEach {
log("Emitting => $it")
_mySharedFlow.emit(it)
delay(250)
}
}
fun release() {
this.coroutineContext[Job]?.cancelChildren()
}
}
fun main() {
log("Start")
val myRepository = MyRepository()
val myComponent = MyComponent(myRepository)
runBlocking {
val job = launch {
delay(1250)
myComponent.mySharedFlow
.onStart {
log("Collector started collecting...")
}
.collect {
log("Collected => $it")
delay(600)
}
}
val fetcher = myComponent.fetchAndEmitList()
fetcher.join()
myComponent.release()
delay(2500)
job.cancel()
}
log("End")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.