Skip to content

Instantly share code, notes, and snippets.

@hannesstruss
Last active August 6, 2018 06:57
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hannesstruss/927ec8120d7cb312d80685f230d50c6e to your computer and use it in GitHub Desktop.
Save hannesstruss/927ec8120d7cb312d80685f230d50c6e to your computer and use it in GitHub Desktop.
Attempt at implementing RxJava's switchMap for Kotlin Coroutine Channels
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumes
import kotlinx.coroutines.experimental.channels.produce
import kotlinx.coroutines.experimental.selects.whileSelect
import kotlin.coroutines.experimental.CoroutineContext
fun <T, R> ReceiveChannel<T>.switchMap(context: CoroutineContext = Unconfined,
transform: suspend (T) -> ReceiveChannel<R>): ReceiveChannel<R> {
val input = this
return produce(context, onCompletion = consumes()) {
var current: ReceiveChannel<R> = transform(input.receive())
val output = this
whileSelect {
input.onReceiveOrNull { t: T? ->
t?.also { current = transform(it) } != null
}
current.onReceiveOrNull { r ->
r?.also { output.send(it) } != null
}
}
}
}
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.channels.produce
import kotlinx.coroutines.experimental.channels.toList
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import org.junit.Test
import kotlin.coroutines.experimental.coroutineContext
suspend fun <E> SendChannel<E>.sendAndLog(name: String, e: E) {
println("$name is sending $e on ${Thread.currentThread().name}")
send(e)
}
class ChannelSwitchMapTest {
@Test fun `it switchMaps`() = runBlocking<Unit> {
val source = produce(coroutineContext) {
send(1)
delay(10)
send(2)
delay(100)
send(3)
}
val switchMapped = source.switchMap(coroutineContext) { i ->
produce {
send("${i}A")
send("${i}B")
delay(15)
send("${i}C")
}
}
val expected = switchMapped.toList()
assertThat(expected).isEqualTo(listOf("1A", "1B", "2A", "2B", "2C"))
}
}
@fvasco
Copy link

fvasco commented Aug 1, 2018

You should wait 3A, 3B, 3C and manage backpressure "on send".

My bugged version

https://gist.github.com/fvasco/09724edd594e0d0c41e8792fb69cbaad

@hannesstruss
Copy link
Author

@fvasco right, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment