Skip to content

Instantly share code, notes, and snippets.

@kevinherron
Last active December 17, 2018 22:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinherron/5a895911a45eb5752ff247a205d618b7 to your computer and use it in GitHub Desktop.
Save kevinherron/5a895911a45eb5752ff247a205d618b7 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.delay
import java.time.Duration
import java.util.concurrent.TimeUnit
fun <T> ReceiveChannel<T>.debounce(timeout: Long, scope: CoroutineScope) =
this.debounce(Duration.ofMillis(timeout), scope)
fun <T> ReceiveChannel<T>.debounce(
timeout: Duration,
scope: CoroutineScope
): ReceiveChannel<T> {
return scope.produce(capacity = Channel.CONFLATED) {
val db = channel.debounce(timeout, scope)
this@debounce.consumeEach { db.send(it) }
}
}
fun <T> SendChannel<T>.debounce(timeout: Long, scope: CoroutineScope) =
this.debounce(Duration.ofMillis(timeout), scope)
fun <T> SendChannel<T>.debounce(
timeout: Duration,
scope: CoroutineScope
): SendChannel<T> {
return scope.actor(capacity = Channel.CONFLATED) {
val timeoutNanos = timeout.toNanos()
var lastSend = -1L
channel.consumeEach {
val elapsedNanos = System.nanoTime() - lastSend
if (elapsedNanos < timeoutNanos) {
val delayNanos = (timeoutNanos - elapsedNanos)
.coerceAtMost(timeoutNanos)
delay(TimeUnit.MILLISECONDS.convert(delayNanos, TimeUnit.NANOSECONDS))
}
this@debounce.send(it)
lastSend = System.nanoTime()
}
}
}
class DebounceTest {
@Test
fun testReceiveChannelDebounce() = runBlocking {
fun newProducer(delayMillis: Long) = GlobalScope.produce {
(0..9).forEach {
send(it)
delay(delayMillis)
}
}
consumeAndAssertCount(newProducer(0).debounce(100, GlobalScope), 1)
consumeAndAssertCount(newProducer(100).debounce(600, GlobalScope), 2)
consumeAndAssertCount(newProducer(100).debounce(150, GlobalScope), 7)
consumeAndAssertCount(newProducer(100).debounce(50, GlobalScope), 10)
}
@Test
fun testSendChannelDebounce() = runBlocking {
fun newProducer(delay: Long, timeout: Long) = GlobalScope.produce<Int> {
val db = this.channel.debounce(timeout, GlobalScope)
(0..9).forEach {
db.send(it)
delay(delay)
}
}
consumeAndAssertCount(newProducer(delay = 0, timeout = 100), 1)
consumeAndAssertCount(newProducer(delay = 100, timeout = 600), 2)
consumeAndAssertCount(newProducer(delay = 100, timeout = 150), 7)
consumeAndAssertCount(newProducer(delay = 100, timeout = 50), 10)
}
private fun consumeAndAssertCount(db: ReceiveChannel<Int>, count: Int) {
var consumed = 0
var lastReceived = 0L
runBlocking {
db.consumeEach {
val now = System.currentTimeMillis()
if (lastReceived > 0) {
println("$it @ +${now - lastReceived}ms")
} else {
println("$it @ initial")
}
lastReceived = now
consumed++
}
}
assertEquals(consumed, count)
}
}
@LouisCAD
Copy link

You need to close the original channel when iteration on the initial one is finished. You can do so by replacing the for loop with consumeEach, or by putting the whole inside a comsume lambda.

@kevinherron
Copy link
Author

Thanks, updated.

@kevinherron
Copy link
Author

Updated for coroutines 1.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment