Instantly share code, notes, and snippets.

Embed
What would you like to do?
Rx operator implementations from the five major classes outlined in https://lam.io/writing/ReactiveX using Kotlin reactive channels
import kotlin.collections.*
import org.reactivestreams.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import kotlinx.coroutines.*
import kotlin.coroutines.*
@ExperimentalCoroutinesApi
fun <T, U> Publisher<T>.debounce(timeout: Long, ctx: CoroutineContext) = GlobalScope.publish(ctx) {
var idx = 0;
consumeEach {
idx++
launch {
val stashed_idx = idx
delay(timeout)
if(stashed_idx == idx)
send(it)
}
}
}
@ExperimentalCoroutinesApi
fun <T> Publisher<T>.buffer(P: Publisher<*>, ctx: CoroutineContext) = GlobalScope.publish<List<T>>(ctx) {
var buffer = mutableListOf<T>()
launch {
consumeEach {
buffer.add(it)
}
}
P.consumeEach {
val buffer_to_send = buffer;
buffer = mutableListOf<T>()
send(buffer_to_send)
}
}
// see reactive channel examples for totally good merge example
// https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/coroutines-guide-reactive.md#merge
@ExperimentalCoroutinesApi
fun <T, U> Publisher<T>.flatmap(ctx: CoroutineContext, f: (T) -> Publisher<U>) = GlobalScope.publish(ctx) {
consumeEach {
launch {
f(it).consumeEach {
send(it)
}
}
}
}
@ExperimentalCoroutinesApi
fun <T, K> Publisher<T>.groupBy(ctx: CoroutineContext, k: (T) -> K) = GlobalScope.publish<Publisher<T>>(ctx) {
val M = mutableMapOf<K, ConflatedBroadcastChannel<T>>()
consumeEach {
val should_emit = !M.contains(k(it))
val ch = M.getOrPut(k(it), { ConflatedBroadcastChannel<T>() })
if(should_emit)
send(publish {
send(it)
}) // -> readonly
ch.offer(it)
yield()
}
for((_, ch) in M) {
ch.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment