Skip to content

Instantly share code, notes, and snippets.

@davibe
Created November 12, 2017 14:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davibe/b6eb141d957ef2bf0e573228f8f29bd9 to your computer and use it in GitHub Desktop.
Save davibe/b6eb141d957ef2bf0e573228f8f29bd9 to your computer and use it in GitHub Desktop.
I tried to implement combineLatest for kotlin channels with corutines
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.launch
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.selects.select
import java.util.*
/*
* Combine multiple channels into one `output` channel.
* Output contains last known data from all channels and sends data on every change.
* Output is closed when any other channel closes first.
*/
suspend fun <T> combineLatest(channels: List<Channel<T>>) : Channel<List<T?>> {
val state = Collections.synchronizedList(mutableListOf<T?>())
for (i in 0..channels.size - 1) { state.add(i, null) }
val output = Channel<List<T?>>()
for (channel in channels) {
launch {
var quit = false
do {
val value = channel.receiveOrNull()
if (value == null) {
output.close()
quit = true
return@launch
} else {
state.set(channels.indexOf(channel), value)
output.offer(state)
}
} while (!quit)
}
}
return output
}
fun main(args: Array<String>) = runBlocking<Unit> {
val channel = Channel<Int>()
val producer = launch {
for (x in 1..15) {
delay(1000)
println("0 ==> " + x)
channel.send(x)
}
channel.close() // we're done sending
}
val channel2 = Channel<Int>()
val producer2 = launch {
for (x in 1..15) {
delay(1100)
println("1 ==> " + x)
channel2.send(x)
}
channel.close() // we're done sending
}
val consumer = launch {
val output = combineLatest(listOf(channel, channel2))
do {
// delay(2000)
val values = output.receiveOrNull()
println(values)
} while (!output.isClosedForReceive)
}
consumer.join()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment