Skip to content

Instantly share code, notes, and snippets.

@timcharper
Created May 7, 2017 22:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timcharper/51b4a6c4a5291cdfe7970e3f855b2a7c to your computer and use it in GitHub Desktop.
Save timcharper/51b4a6c4a5291cdfe7970e3f855b2a7c to your computer and use it in GitHub Desktop.
Debouncer
package org.ensime.util
import akka.actor.Scheduler
import java.time.Clock
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import org.slf4j.LoggerFactory
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
/**
* Generic debouncer implementation
*
* @param name The name of this debouncer
* @param scheduler The scheduler implementation to use; Can accept an akka scheduler a la magnet pattern
* @param delay The time after which to invoke action; each time [[Debouncer.call]] is invoked, this delay is reset
* @param maxDelay The maximum time to delay the invocation of action()
* @param action The action to invoke
* @param Clock The source from which we get the current time. This input should use the same source. Specified for testing purposes
*/
class Debouncer(
name: String,
scheduler: Debouncer.SchedulerLike,
delay: FiniteDuration,
maxDelay: FiniteDuration,
action: () => Unit,
clock: Clock = Clock.systemUTC
) {
require(delay <= maxDelay, "delay should be <= maxDelay")
private val logger = LoggerFactory.getLogger(getClass)
private[this] val pending = new AtomicBoolean(false)
private[this] val calls = new AtomicInteger(0)
private val delayMs = delay.toMillis
private val maxDelayMs = maxDelay.toMillis
@volatile private[this] var lastCallAttempt: Long = clock.millis()
@volatile private[this] var lastInvocation: Long = clock.millis()
def call(): Unit = {
lastCallAttempt = clock.millis()
calls.incrementAndGet()
if (pending.compareAndSet(false, true)) {
scheduler.scheduleOnce(delay) { () => tryActionOrPostpone() }
}
}
private[this] def tryActionOrPostpone(): Unit = {
val now = clock.millis()
val delaySurpassed = ((now - lastCallAttempt) >= delayMs)
val maxDelaySurpassed = ((now - lastInvocation) >= maxDelayMs)
if (delaySurpassed || maxDelaySurpassed) {
lastInvocation = now
if (pending.compareAndSet(true, false)) {
val foldedCalls = calls.getAndSet(0)
if (maxDelaySurpassed)
logger.info(
"Debouncer action {} invoked after maxDelay {} surpassed; {} calls occurred since last invocation.",
name, maxDelay, foldedCalls.toString
)
else
logger.debug(
"Debouncer action {} invoked after delay {} surpassed; {} calls occurred since last invocation.",
name, delay, foldedCalls.toString
)
try action()
catch {
case ex: Throwable => logger.error(s"Debouncer ${name} action resulted in error", ex)
}
} else
logger.error(s"Invalid state in debouncer. Should not have reached here!")
} else {
// reschedule at the earliest of lastInvocation or lastCallAttempt
val nextByDelay = (lastCallAttempt - now + delayMs)
val nextByMaxDelay = (lastInvocation - now + maxDelayMs)
scheduler.scheduleOnce(Math.min(nextByDelay, nextByMaxDelay).millis) { () => tryActionOrPostpone() }
}
}
}
object Debouncer {
def apply(
name: String,
scheduler: Debouncer.SchedulerLike,
delay: FiniteDuration,
maxDelay: FiniteDuration,
clock: Clock = Clock.systemUTC
)(action: () => Unit): Debouncer =
new Debouncer(name, scheduler, delay, maxDelay, action, clock)
trait SchedulerLike {
/**
* Scheduler method which is guaranteed to call body AFTER the provided delay passes
*/
def scheduleOnce(delay: FiniteDuration)(body: () => Unit): Unit
}
object SchedulerLike {
implicit def fromAkkaScheduler(scheduler: Scheduler)(
implicit
ec: ExecutionContext
): SchedulerLike = new SchedulerLike {
def scheduleOnce(delay: FiniteDuration)(body: () => Unit): Unit =
scheduler.scheduleOnce(delay)(body())
}
}
}
package org.ensime.util
import java.time.{ Clock, Instant, ZoneId }
import org.scalatest._
import scala.concurrent.duration._
class DebouncerSpec extends FlatSpec with Matchers {
trait Fixture {
class TestScheduler(clock: Clock) extends Debouncer.SchedulerLike {
@volatile var action: () => Unit = _
@volatile var nextInvocation: Long = Long.MaxValue
@volatile var currentTime: Long = 0L
override def scheduleOnce(delay: FiniteDuration)(body: () => Unit): Unit = synchronized {
action = body
nextInvocation = currentTime + delay.toMillis
}
def poll(): Unit = synchronized {
currentTime = clock.millis()
if (currentTime >= nextInvocation) {
nextInvocation = Long.MaxValue
action()
}
}
}
class TestClock extends Clock {
@volatile var currentTime: Long = 0
def advanceTime(by: Long): Unit = synchronized {
require(by > 0)
currentTime += by
}
override def instant(): Instant =
Instant.ofEpochMilli(currentTime)
override def getZone(): ZoneId =
java.time.ZoneOffset.UTC
override def withZone(zone: ZoneId) = ???
}
val clock = new TestClock
val scheduler = new TestScheduler(clock)
def advanceTime(amount: FiniteDuration): Unit = {
clock.advanceTime(amount.toMillis)
scheduler.poll()
}
var called = false
val delay = 5.seconds
val maxDelay = 20.seconds
lazy val debouncer = Debouncer("test", scheduler, delay, maxDelay, clock) { () =>
called = true
}
}
it should "call the action only after the delay has passed" in new Fixture {
debouncer.call()
called.shouldBe(false)
advanceTime(delay)
called.shouldBe(true)
}
it should "extend the deadline if called before the delay has passed" in new Fixture {
debouncer.call()
called.shouldBe(false)
advanceTime(2.seconds)
called.shouldBe(false)
debouncer.call()
advanceTime(4.seconds)
called.shouldBe(false)
advanceTime(delay)
called.shouldBe(true)
}
it should "call after maxDelay has passed" in new Fixture {
(1 to 20).foreach { _ =>
debouncer.call()
called.shouldBe(false)
advanceTime(1.second)
}
called.shouldBe(true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment