Skip to content

Instantly share code, notes, and snippets.

@hrach
Last active May 24, 2021 08:52
Show Gist options
  • Save hrach/a552727f98e3eb7a241632a05c4ca9cb to your computer and use it in GitHub Desktop.
Save hrach/a552727f98e3eb7a241632a05c4ca9cb to your computer and use it in GitHub Desktop.
Kotlin Channels Debounce & Throttle
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlin.coroutines.experimental.CoroutineContext
fun <E> ReceiveChannel<E>.debounce(
wait: Long = 50,
context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<E> = produce(context) {
var lastTimeout: Job? = null
consumeEach {
lastTimeout?.cancel()
lastTimeout = launch {
delay(wait)
send(it)
}
}
lastTimeout?.join()
}
fun <E> ReceiveChannel<E>.throttle(
wait: Long = 200,
context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<E> = produce(context) {
var nextTime = 0L
consumeEach {
val curTime = System.currentTimeMillis()
if (curTime >= nextTime) {
nextTime = curTime + wait
send(it)
}
}
}
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlin.coroutines.experimental.CoroutineContext
fun main(args: Array<String>) = runBlocking {
val channel = produce<Int> {
(0..100).forEach {
send(it)
delay(20)
}
}
channel.throttle().consumeEach { println(it) }
}
@loongee
Copy link

loongee commented Apr 8, 2020

System.currentTimeMillis() is not reliable, could be modified by user at any time, SystemClock.uptimeMillis() is much better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment