Skip to content

Instantly share code, notes, and snippets.

@krishnabhargav
Last active October 17, 2019 03:14
Show Gist options
  • Save krishnabhargav/5c9d86d29aa583ff511eec94318806d2 to your computer and use it in GitHub Desktop.
Save krishnabhargav/5c9d86d29aa583ff511eec94318806d2 to your computer and use it in GitHub Desktop.
Using Flow to simulate a pull sequence
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.random.Random
@FlowPreview
fun main() = runBlocking {
val events = eventStoreProducer()
val messages = kafkaProducer()
val dynamic = unfoldAsync(0, ::unfoldExample)
//Note: need to study more on conflate seems like when its conflated the full sequence ends when the first sequence ends.
val merged = sequenceOf(events, messages, dynamic).asFlow().flattenMerge()
merged.collect { (m, commit) ->
println("Consumer: Received=$m")
commit()
//readLine()
}
println("Done")
}
private suspend fun unfoldExample(it: Int): Pair<Int, Pair<String, () -> Unit>>? {
return if(it < 5) {
delay(Random.nextLong(1000))
val output = "Dynamic$it" to { println("Done!!!") }
(it + 1) to output
}
else
null
}
typealias CommitOffset = () -> Unit
fun eventStoreProducer(): Flow<Pair<String, CommitOffset>> = flow {
delay(Random.nextLong(1000))
println("EventProducer: Fetching another event")
emit("Event 1" to { println("Committing event 1") })
delay(Random.nextLong(1000))
println("EventProducer: Fetching another event")
emit("Event 2" to { println("Committing event 2") })
}
fun kafkaProducer(): Flow<Pair<String, CommitOffset>> = flow {
delay(Random.nextLong(1000))
println("KafkaProducer: Fetching another message")
emit("Message 1" to { println("Committing offset 1") })
delay(Random.nextLong(1000))
println("KafkaProducer: Fetching another message")
emit("Message 2" to { println("Committing offset 2") })
}
//Option type in Kotlin is a pain to implement so will just use nullable for this example.
//sealed class Option<T> {
// object None : Option<Nothing>()
// data class Some<T>(val a: T) : Option<T>()
//}
fun <T, S> unfoldAsync(initialState: S, unfolder: suspend (S) -> Pair<S, T>?): Flow<T> = flow {
var current = initialState
while (true) {
val result = unfolder(current)
if (result!=null) {
val (nextState, value) = result
emit(value)
current = nextState
} else return@flow
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment