Skip to content

Instantly share code, notes, and snippets.

@thiyagu06
Last active December 30, 2019 11:08
Show Gist options
  • Save thiyagu06/974ede71f4ec0da24bee4574450ab6e7 to your computer and use it in GitHub Desktop.
Save thiyagu06/974ede71f4ec0da24bee4574450ab6e7 to your computer and use it in GitHub Desktop.
Infinite Flow Processor
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
import kotlin.random.Random
/**
Use case :
1. poll data from some non blocking io (possibly http endpoint)
2. process the data
3. acknowledge that I processed the data
**/
class InfiniteFlowProcessor : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val scope = CoroutineScope(coroutineContext + supervisorJob)
fun start() {
val data: Flow<Int> = MessageProvider().getMessage()
//data.buffer(500).launchIn(scope) // start producing elements without terminal operator.
launch(coroutineContext) {
data.buffer(500)
} // not producing elements since no terminal operator.
}
fun stop() {
supervisorJob.cancel()
}
}
fun main() = runBlocking {
val experiments = Experiments()
experiments.start()
delay(3000)
experiments.stop()
println("completed")
}
class MessageProvider {
fun getMessage(): Flow<Int> {
return channelFlow {
val job = launch {
while (isActive) {
println("sending on ${Thread.currentThread().name}")
send(Random.nextInt(1000)) // pretend receiving data from non-blocking io
delay(100)
}
}
awaitClose {
job.cancel()
}
}
}
}
fun main() = runBlocking {
val experiments = InfiniteFlowProcessor()
experiments.start()
delay(3000) // pretend app crashes/terminated
experiments.stop()
println("completed")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment