Skip to content

Instantly share code, notes, and snippets.

@thiyagu06
Last active January 3, 2020 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thiyagu06/549b03512818eac42c123c4bd0456003 to your computer and use it in GitHub Desktop.
Save thiyagu06/549b03512818eac42c123c4bd0456003 to your computer and use it in GitHub Desktop.
Understanding coroutine scope when producer and Consumer running on different coroutine context
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
fun main() = runBlocking {
val producerContext: CoroutineContext = Executors.newFixedThreadPool(1).asCoroutineDispatcher()
val consumerContext: CoroutineContext = Executors.newFixedThreadPool(2).asCoroutineDispatcher()
val dataProvider: Flow<Int> = channelFlow {
launch {
while (isActive) {
println("sending on ${Thread.currentThread().name}")
channel.send(Random.nextInt(1000)) // pretend receiving data from non-blocking io
delay(100)
}
}
}
val superVisorJob = Job()
val flowScope = CoroutineScope(superVisorJob + consumerContext)
dataProvider.flowOn(producerContext).onEach {
println("data received on ${Thread.currentThread().name}-->$it")
}.launchIn(flowScope)
delay(3000)
flowScope.cancel()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment