Skip to content

Instantly share code, notes, and snippets.

@thiyagu06
Created December 30, 2019 19:02
Show Gist options
  • Save thiyagu06/2afd4437ff9d0eccacafff0bd6216874 to your computer and use it in GitHub Desktop.
Save thiyagu06/2afd4437ff9d0eccacafff0bd6216874 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
class TryChannelFlow : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val scope = CoroutineScope(supervisorJob + coroutineContext)
fun start() {
val data: Flow<Int> = DataProvider().generateData()
data.onEach{
println("received -->$it")
}.launchIn(scope)
}
fun stop() {
supervisorJob.cancel()
}
}
class DataProvider {
fun generateData(): Flow<Int> {
return channelFlow {
val job = launch {
while (isActive) {
println("sending on ${Thread.currentThread().name}")
channel.send(Random.nextInt(1000)) // pretend receiving data from non-blocking io
delay(100)
}
}
awaitClose {
job.cancel()
}
}
}
}
fun main() = runBlocking {
val tryChannelFlow = TryChannelFlow()
tryChannelFlow.start()
delay(3000) // pretend app crashes/terminated
tryChannelFlow.stop()
println("completed")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment