Skip to content

Instantly share code, notes, and snippets.

@hannesstruss
Last active June 18, 2018 15:05
Show Gist options
  • Save hannesstruss/b3f925f6b46d4178abbf18c8d0703a3c to your computer and use it in GitHub Desktop.
Save hannesstruss/b3f925f6b46d4178abbf18c8d0703a3c to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.experimental.Unconfined
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumes
import kotlinx.coroutines.experimental.channels.produce
import kotlinx.coroutines.experimental.channels.toList
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import kotlinx.coroutines.experimental.selects.whileSelect
import kotlinx.coroutines.experimental.test.TestCoroutineContext
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.CoroutineContext
fun <T, R> ReceiveChannel<T>.switchMap(context: CoroutineContext = Unconfined,
transform: suspend (T) -> ReceiveChannel<R>): ReceiveChannel<R> {
val input = this
return produce(context, onCompletion = consumes()) {
var current: ReceiveChannel<R> = transform(input.receive())
val output = this
whileSelect {
input.onReceiveOrNull { t: T? ->
t?.also { current = transform(it) } != null
}
current.onReceiveOrNull { r ->
r?.also { output.send(it) } != null
}
}
}
}
fun main(args: Array<String>) {
runBlocking {
val ctx = TestCoroutineContext()
val source = produce(ctx) {
send(1)
delay(10)
send(2)
delay(100)
send(3)
}
val switchMapped = source.switchMap(ctx) { i ->
produce(ctx) {
send("${i}A")
send("${i}B")
delay(15)
send("${i}C")
}
}
val expected = async { switchMapped.toList() }
ctx.advanceTimeBy(1, TimeUnit.HOURS)
val list = expected.await()
println(list)
println(list == listOf("1A", "1B", "2A", "2B", "2C"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment