Skip to content

Instantly share code, notes, and snippets.

@hrach
Last active May 24, 2021 08:52
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hrach/a552727f98e3eb7a241632a05c4ca9cb to your computer and use it in GitHub Desktop.
Save hrach/a552727f98e3eb7a241632a05c4ca9cb to your computer and use it in GitHub Desktop.
Kotlin Channels Debounce & Throttle
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlin.coroutines.experimental.CoroutineContext
fun <E> ReceiveChannel<E>.debounce(
wait: Long = 50,
context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<E> = produce(context) {
var lastTimeout: Job? = null
consumeEach {
lastTimeout?.cancel()
lastTimeout = launch {
delay(wait)
send(it)
}
}
lastTimeout?.join()
}
fun <E> ReceiveChannel<E>.throttle(
wait: Long = 200,
context: CoroutineContext = DefaultDispatcher
): ReceiveChannel<E> = produce(context) {
var nextTime = 0L
consumeEach {
val curTime = System.currentTimeMillis()
if (curTime >= nextTime) {
nextTime = curTime + wait
send(it)
}
}
}
import kotlinx.coroutines.experimental.*
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.consumeEach
import kotlinx.coroutines.experimental.channels.produce
import kotlin.coroutines.experimental.CoroutineContext
fun main(args: Array<String>) = runBlocking {
val channel = produce<Int> {
(0..100).forEach {
send(it)
delay(20)
}
}
channel.throttle().consumeEach { println(it) }
}
@JakubNeukirch
Copy link

JakubNeukirch commented Sep 27, 2019

updated to work with new Kotlin versions where experimental package is removed and migrated to kotlinx
` import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce
import kotlin.coroutines.CoroutineContext

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
suspend fun ReceiveChannel.debounce(
wait: Long = 50,
context: CoroutineContext = Dispatchers.Default
): ReceiveChannel {
return withContext(context) {
produce {
var lastTimeout: Job? = null
consumeEach {
lastTimeout?.cancel()
lastTimeout = launch {
delay(wait)
send(it)
}
}
lastTimeout?.join()
}
}
}

@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
suspend fun ReceiveChannel.throttle(
wait: Long = 200,
context: CoroutineContext = Dispatchers.Default
): ReceiveChannel {
return withContext(context){
produce {
var nextTime = 0L
consumeEach {
val curTime = System.currentTimeMillis()
if (curTime >= nextTime) {
nextTime = curTime + wait
send(it)
}
}
}
}
} `

@loongee
Copy link

loongee commented Apr 8, 2020

System.currentTimeMillis() is not reliable, could be modified by user at any time, SystemClock.uptimeMillis() is much better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment