Last active
August 6, 2018 06:57
-
-
Save hannesstruss/927ec8120d7cb312d80685f230d50c6e to your computer and use it in GitHub Desktop.
Attempt at implementing RxJava's switchMap for Kotlin Coroutine Channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.experimental.Unconfined | |
import kotlinx.coroutines.experimental.channels.ReceiveChannel | |
import kotlinx.coroutines.experimental.channels.consumes | |
import kotlinx.coroutines.experimental.channels.produce | |
import kotlinx.coroutines.experimental.selects.whileSelect | |
import kotlin.coroutines.experimental.CoroutineContext | |
fun <T, R> ReceiveChannel<T>.switchMap(context: CoroutineContext = Unconfined, | |
transform: suspend (T) -> ReceiveChannel<R>): ReceiveChannel<R> { | |
val input = this | |
return produce(context, onCompletion = consumes()) { | |
var current: ReceiveChannel<R> = transform(input.receive()) | |
val output = this | |
whileSelect { | |
input.onReceiveOrNull { t: T? -> | |
t?.also { current = transform(it) } != null | |
} | |
current.onReceiveOrNull { r -> | |
r?.also { output.send(it) } != null | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.truth.Truth.assertThat | |
import kotlinx.coroutines.experimental.channels.SendChannel | |
import kotlinx.coroutines.experimental.channels.produce | |
import kotlinx.coroutines.experimental.channels.toList | |
import kotlinx.coroutines.experimental.delay | |
import kotlinx.coroutines.experimental.runBlocking | |
import org.junit.Test | |
import kotlin.coroutines.experimental.coroutineContext | |
suspend fun <E> SendChannel<E>.sendAndLog(name: String, e: E) { | |
println("$name is sending $e on ${Thread.currentThread().name}") | |
send(e) | |
} | |
class ChannelSwitchMapTest { | |
@Test fun `it switchMaps`() = runBlocking<Unit> { | |
val source = produce(coroutineContext) { | |
send(1) | |
delay(10) | |
send(2) | |
delay(100) | |
send(3) | |
} | |
val switchMapped = source.switchMap(coroutineContext) { i -> | |
produce { | |
send("${i}A") | |
send("${i}B") | |
delay(15) | |
send("${i}C") | |
} | |
} | |
val expected = switchMapped.toList() | |
assertThat(expected).isEqualTo(listOf("1A", "1B", "2A", "2B", "2C")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
You should wait
3A, 3B, 3C
and manage backpressure "on send".My bugged version
https://gist.github.com/fvasco/09724edd594e0d0c41e8792fb69cbaad