Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Debounce
import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.experimental.runBlocking
import kotlin.coroutines.experimental.CoroutineContext
fun <T> ReceiveChannel<T>.debounce(
wait: Long = 300,
context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<T> = produce(context) {
var nextTime = 0L
consumeEach {
val curTime = System.currentTimeMillis()
if (curTime < nextTime) {
// not enough time passed from last send
delay(nextTime - curTime)
var mostRecent = it
while (!isEmpty) { mostRecent = receive() } // take the most recently sent without waiting
nextTime += wait // maintain strict time interval between sends
send(mostRecent)
} else {
// big pause between original events
nextTime = curTime + wait // start tracking time interval from scratch
send(it)
}
}
}
fun main(args: Array<String>) = runBlocking {
val channel = produce<Int> {
(0..100).forEach {
println("send")
send(it)
delay(100)
}
}
channel.debounce().consumeEach { println("Yay!") }
}
@rciurkot

This comment has been minimized.

Copy link

rciurkot commented Dec 20, 2017

Effectively this solution only drops every other event. It produces output of:

Yay! 0
Yay! 2
Yay! 4
Yay! 6
Yay! 8
Yay! 10
Yay! 12

Where (for wait time of 300ms) it should look more like:

Yay! 0
Yay! 2
Yay! 5
Yay! 8
Yay! 11
Yay! 14
Yay! 17

@elizarov please take a look at this solution:

fun <T> ReceiveChannel<T>.debounce(
        wait: Long = 300,
        context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<T> = produce(context) {
    var nextTime = 0L
    var lastVal: T
    var job: Job? = null
    consumeEach {
        lastVal = it
        if (job == null) {
            val curTime = System.currentTimeMillis()
            if (curTime < nextTime) {
                // not enough time passed from last send
                job = launch {
                    delay(nextTime - curTime)
                    nextTime += wait // maintain strict time interval between sends
                    job = null
                    send(lastVal)
                }
            } else {
                // big pause between original events
                nextTime = curTime + wait // start tracking time interval from scratch
                send(lastVal)
            }
        }
    }
    job?.join() //waiting for the last debouncing to end
}

When debouncing is needed, it is offloaded to new coroutine (job = launch{}) which delays sending value. It always sends the most recent value.

@rciurkot

This comment has been minimized.

Copy link

rciurkot commented Dec 20, 2017

However, if you want to achieve debouncing as described in rx docs (http://reactivex.io/documentation/operators/debounce.html)

You do it this way:

fun <T> ReceiveChannel<T>.debounceUntilSettledDown(
        settleTime: Long = 300,
        context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<T> = produce(context) {
    var job: Job? = null
    consumeEach {
        job?.cancel()
        job = launch {
            delay(settleTime)
            send(it)
        }
    }
    job?.join() //waiting for the last debouncing to end
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.