Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active August 26, 2019 00:19
Show Gist options
  • Save elizarov/69ff0cf47d9ffe013bc6c1aeaf5af552 to your computer and use it in GitHub Desktop.
Save elizarov/69ff0cf47d9ffe013bc6c1aeaf5af552 to your computer and use it in GitHub Desktop.
Debounce
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import kotlin.coroutines.experimental.CoroutineContext
fun <T> ReceiveChannel<T>.debounce(
wait: Long = 300,
context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<T> = produce(context) {
var nextTime = 0L
consumeEach {
val curTime = System.currentTimeMillis()
if (curTime < nextTime) {
// not enough time passed from last send
delay(nextTime - curTime)
var mostRecent = it
while (!isEmpty) { mostRecent = receive() } // take the most recently sent without waiting
nextTime += wait // maintain strict time interval between sends
send(mostRecent)
} else {
// big pause between original events
nextTime = curTime + wait // start tracking time interval from scratch
send(it)
}
}
}
fun main(args: Array<String>) = runBlocking {
val channel = produce<Int> {
(0..100).forEach {
println("send")
send(it)
delay(100)
}
}
channel.debounce().consumeEach { println("Yay!") }
}
@rciurkot
Copy link

However, if you want to achieve debouncing as described in rx docs (http://reactivex.io/documentation/operators/debounce.html)

You do it this way:

fun <T> ReceiveChannel<T>.debounceUntilSettledDown(
        settleTime: Long = 300,
        context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<T> = produce(context) {
    var job: Job? = null
    consumeEach {
        job?.cancel()
        job = launch {
            delay(settleTime)
            send(it)
        }
    }
    job?.join() //waiting for the last debouncing to end
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment