Skip to content

Instantly share code, notes, and snippets.

@houssemzaier
Created November 5, 2020 19:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save houssemzaier/315a93adbb9b997b51d874ef07a3cc2b to your computer and use it in GitHub Desktop.
Save houssemzaier/315a93adbb9b997b51d874ef07a3cc2b to your computer and use it in GitHub Desktop.
Pushing data to one Flow from two twoCoroutines (Senders) and dispatching the received data to 2 receivers (collectors) that are collecting from one sharedFlow (dispatcher)
package fr.francetv.francetvsport.arch.application
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.collect
fun main() = runBlocking {
// val parentJob = coroutineContext[Job]!! don't kill the job comeing from runblocking this job refuse cancelation and throw exception
val parentJob = Job()
val parentExceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
println("exception !! -> $throwable")
}
val parentContext = parentJob + parentExceptionHandler
launch(parentContext) {
val origin: Flow<Comparable<*>> = channelFlow {
launch {
for (i in 1..50) {
delay(500)
send(i)
println("sent the number : $i")
}
}.invokeOnCompletion {
println("****1st sender is finished")
}
launch {
for (c in 'a'..'z') {
delay(2000)
send(c)
println("sent the char : $c")
}
}.invokeOnCompletion {
println("****2nd sender is finished")
}
awaitClose {
println("****closing channelFlow")
}
}
val shared = MutableStateFlow<Any?>(null)
launch {
while (isActive) {
origin.collect {
println(" dispatcher consumed this: $it")
shared.value = it
println(" dispatcher send this: $it")
}
}
}.invokeOnCompletion {
println("****dispatcher is finished")
}
launch {
while (isActive) {
delay(1000)
val value = shared.value
println("1st consumer consumed this: $value")
}
}.invokeOnCompletion {
println("****1st consumer is finished")
}
launch {
while (isActive) {
delay(1000)
val value = shared.value
println("2st consumer consumed this: $value")
}
}.invokeOnCompletion {
println("****2st consumer is finished")
}
launch {
delay(10_000)
val jobParent = parentContext[Job]
println("closing my parent $jobParent")
parentJob.cancel()
}.invokeOnCompletion {
println("****closer is finished")
}
}.join()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment