Rx operator implementations from the five major classes outlined in https://lam.io/writing/ReactiveX using Kotlin reactive channels
import kotlin.collections.* | |
import org.reactivestreams.* | |
import kotlinx.coroutines.channels.* | |
import kotlinx.coroutines.reactive.* | |
import kotlinx.coroutines.* | |
import kotlin.coroutines.* | |
@ExperimentalCoroutinesApi | |
fun <T, U> Publisher<T>.debounce(timeout: Long, ctx: CoroutineContext) = GlobalScope.publish(ctx) { | |
var idx = 0; | |
consumeEach { | |
idx++ | |
launch { | |
val stashed_idx = idx | |
delay(timeout) | |
if(stashed_idx == idx) | |
send(it) | |
} | |
} | |
} | |
@ExperimentalCoroutinesApi | |
fun <T> Publisher<T>.buffer(P: Publisher<*>, ctx: CoroutineContext) = GlobalScope.publish<List<T>>(ctx) { | |
var buffer = mutableListOf<T>() | |
launch { | |
consumeEach { | |
buffer.add(it) | |
} | |
} | |
P.consumeEach { | |
val buffer_to_send = buffer; | |
buffer = mutableListOf<T>() | |
send(buffer_to_send) | |
} | |
} | |
// see reactive channel examples for totally good merge example | |
// https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/coroutines-guide-reactive.md#merge | |
@ExperimentalCoroutinesApi | |
fun <T, U> Publisher<T>.flatmap(ctx: CoroutineContext, f: (T) -> Publisher<U>) = GlobalScope.publish(ctx) { | |
consumeEach { | |
launch { | |
f(it).consumeEach { | |
send(it) | |
} | |
} | |
} | |
} | |
@ExperimentalCoroutinesApi | |
fun <T, K> Publisher<T>.groupBy(ctx: CoroutineContext, k: (T) -> K) = GlobalScope.publish<Publisher<T>>(ctx) { | |
val M = mutableMapOf<K, ConflatedBroadcastChannel<T>>() | |
consumeEach { | |
val should_emit = !M.contains(k(it)) | |
val ch = M.getOrPut(k(it), { ConflatedBroadcastChannel<T>() }) | |
if(should_emit) | |
send(publish { | |
send(it) | |
}) // -> readonly | |
ch.offer(it) | |
yield() | |
} | |
for((_, ch) in M) { | |
ch.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment