Skip to content

Instantly share code, notes, and snippets.

@kevinherron
Last active December 17, 2018 22:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinherron/5a895911a45eb5752ff247a205d618b7 to your computer and use it in GitHub Desktop.
Save kevinherron/5a895911a45eb5752ff247a205d618b7 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.delay
import java.time.Duration
import java.util.concurrent.TimeUnit
fun <T> ReceiveChannel<T>.debounce(timeout: Long, scope: CoroutineScope) =
this.debounce(Duration.ofMillis(timeout), scope)
fun <T> ReceiveChannel<T>.debounce(
timeout: Duration,
scope: CoroutineScope
): ReceiveChannel<T> {
return scope.produce(capacity = Channel.CONFLATED) {
val db = channel.debounce(timeout, scope)
this@debounce.consumeEach { db.send(it) }
}
}
fun <T> SendChannel<T>.debounce(timeout: Long, scope: CoroutineScope) =
this.debounce(Duration.ofMillis(timeout), scope)
fun <T> SendChannel<T>.debounce(
timeout: Duration,
scope: CoroutineScope
): SendChannel<T> {
return scope.actor(capacity = Channel.CONFLATED) {
val timeoutNanos = timeout.toNanos()
var lastSend = -1L
channel.consumeEach {
val elapsedNanos = System.nanoTime() - lastSend
if (elapsedNanos < timeoutNanos) {
val delayNanos = (timeoutNanos - elapsedNanos)
.coerceAtMost(timeoutNanos)
delay(TimeUnit.MILLISECONDS.convert(delayNanos, TimeUnit.NANOSECONDS))
}
this@debounce.send(it)
lastSend = System.nanoTime()
}
}
}
class DebounceTest {
@Test
fun testReceiveChannelDebounce() = runBlocking {
fun newProducer(delayMillis: Long) = GlobalScope.produce {
(0..9).forEach {
send(it)
delay(delayMillis)
}
}
consumeAndAssertCount(newProducer(0).debounce(100, GlobalScope), 1)
consumeAndAssertCount(newProducer(100).debounce(600, GlobalScope), 2)
consumeAndAssertCount(newProducer(100).debounce(150, GlobalScope), 7)
consumeAndAssertCount(newProducer(100).debounce(50, GlobalScope), 10)
}
@Test
fun testSendChannelDebounce() = runBlocking {
fun newProducer(delay: Long, timeout: Long) = GlobalScope.produce<Int> {
val db = this.channel.debounce(timeout, GlobalScope)
(0..9).forEach {
db.send(it)
delay(delay)
}
}
consumeAndAssertCount(newProducer(delay = 0, timeout = 100), 1)
consumeAndAssertCount(newProducer(delay = 100, timeout = 600), 2)
consumeAndAssertCount(newProducer(delay = 100, timeout = 150), 7)
consumeAndAssertCount(newProducer(delay = 100, timeout = 50), 10)
}
private fun consumeAndAssertCount(db: ReceiveChannel<Int>, count: Int) {
var consumed = 0
var lastReceived = 0L
runBlocking {
db.consumeEach {
val now = System.currentTimeMillis()
if (lastReceived > 0) {
println("$it @ +${now - lastReceived}ms")
} else {
println("$it @ initial")
}
lastReceived = now
consumed++
}
}
assertEquals(consumed, count)
}
}
@kevinherron
Copy link
Author

Updated for coroutines 1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment