Skip to content

Instantly share code, notes, and snippets.

@thiyagu06
Last active January 3, 2020 12:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save thiyagu06/9813da5a5b12ad91ce0ad8b26043d867 to your computer and use it in GitHub Desktop.
Save thiyagu06/9813da5a5b12ad91ce0ad8b26043d867 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
class InfiniteFlowProcessor : CoroutineScope {
private val job = Job()
private
val dispatcher = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
override val coroutineContext: CoroutineContext
get() = dispatcher
private val scope = CoroutineScope(job + coroutineContext)
//var stop: Job = null
fun start() {
val data: Flow<Int> =
MessageProvider().getMessage()
data.buffer(500).onEach {
println("receiving $it on ${Thread.currentThread().name}")
}.catch{
println("exception caught")
}.launchIn(scope) // start producing elements without terminal operator.
}
fun stop() {
job.cancel()
dispatcher.close()
}
}
class MessageProvider {
fun getMessage(): Flow<Int> {
return channelFlow {
val job = launchMessageReceiver(channel)
awaitClose {
job.cancel()
}
}
}
}
fun CoroutineScope.launchMessageReceiver(channel: SendChannel<Int>) = launch {
while (isActive) {
println("sending on ${Thread.currentThread().name}")
channel.send(Random.nextInt(1000)) // pretend receiving data from non-blocking io
delay(100)
}
}
fun main() = runBlocking {
val experiments = InfiniteFlowProcessor()
experiments.start()
delay(3000) // pretend app crashes/terminated
experiments.stop()
println("completed")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment