Created
May 7, 2017 22:31
-
-
Save timcharper/51b4a6c4a5291cdfe7970e3f855b2a7c to your computer and use it in GitHub Desktop.
Debouncer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.ensime.util | |
import akka.actor.Scheduler | |
import java.time.Clock | |
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } | |
import org.slf4j.LoggerFactory | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration._ | |
/** | |
* Generic debouncer implementation | |
* | |
* @param name The name of this debouncer | |
* @param scheduler The scheduler implementation to use; Can accept an akka scheduler a la magnet pattern | |
* @param delay The time after which to invoke action; each time [[Debouncer.call]] is invoked, this delay is reset | |
* @param maxDelay The maximum time to delay the invocation of action() | |
* @param action The action to invoke | |
* @param Clock The source from which we get the current time. This input should use the same source. Specified for testing purposes | |
*/ | |
class Debouncer( | |
name: String, | |
scheduler: Debouncer.SchedulerLike, | |
delay: FiniteDuration, | |
maxDelay: FiniteDuration, | |
action: () => Unit, | |
clock: Clock = Clock.systemUTC | |
) { | |
require(delay <= maxDelay, "delay should be <= maxDelay") | |
private val logger = LoggerFactory.getLogger(getClass) | |
private[this] val pending = new AtomicBoolean(false) | |
private[this] val calls = new AtomicInteger(0) | |
private val delayMs = delay.toMillis | |
private val maxDelayMs = maxDelay.toMillis | |
@volatile private[this] var lastCallAttempt: Long = clock.millis() | |
@volatile private[this] var lastInvocation: Long = clock.millis() | |
def call(): Unit = { | |
lastCallAttempt = clock.millis() | |
calls.incrementAndGet() | |
if (pending.compareAndSet(false, true)) { | |
scheduler.scheduleOnce(delay) { () => tryActionOrPostpone() } | |
} | |
} | |
private[this] def tryActionOrPostpone(): Unit = { | |
val now = clock.millis() | |
val delaySurpassed = ((now - lastCallAttempt) >= delayMs) | |
val maxDelaySurpassed = ((now - lastInvocation) >= maxDelayMs) | |
if (delaySurpassed || maxDelaySurpassed) { | |
lastInvocation = now | |
if (pending.compareAndSet(true, false)) { | |
val foldedCalls = calls.getAndSet(0) | |
if (maxDelaySurpassed) | |
logger.info( | |
"Debouncer action {} invoked after maxDelay {} surpassed; {} calls occurred since last invocation.", | |
name, maxDelay, foldedCalls.toString | |
) | |
else | |
logger.debug( | |
"Debouncer action {} invoked after delay {} surpassed; {} calls occurred since last invocation.", | |
name, delay, foldedCalls.toString | |
) | |
try action() | |
catch { | |
case ex: Throwable => logger.error(s"Debouncer ${name} action resulted in error", ex) | |
} | |
} else | |
logger.error(s"Invalid state in debouncer. Should not have reached here!") | |
} else { | |
// reschedule at the earliest of lastInvocation or lastCallAttempt | |
val nextByDelay = (lastCallAttempt - now + delayMs) | |
val nextByMaxDelay = (lastInvocation - now + maxDelayMs) | |
scheduler.scheduleOnce(Math.min(nextByDelay, nextByMaxDelay).millis) { () => tryActionOrPostpone() } | |
} | |
} | |
} | |
object Debouncer { | |
def apply( | |
name: String, | |
scheduler: Debouncer.SchedulerLike, | |
delay: FiniteDuration, | |
maxDelay: FiniteDuration, | |
clock: Clock = Clock.systemUTC | |
)(action: () => Unit): Debouncer = | |
new Debouncer(name, scheduler, delay, maxDelay, action, clock) | |
trait SchedulerLike { | |
/** | |
* Scheduler method which is guaranteed to call body AFTER the provided delay passes | |
*/ | |
def scheduleOnce(delay: FiniteDuration)(body: () => Unit): Unit | |
} | |
object SchedulerLike { | |
implicit def fromAkkaScheduler(scheduler: Scheduler)( | |
implicit | |
ec: ExecutionContext | |
): SchedulerLike = new SchedulerLike { | |
def scheduleOnce(delay: FiniteDuration)(body: () => Unit): Unit = | |
scheduler.scheduleOnce(delay)(body()) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.ensime.util | |
import java.time.{ Clock, Instant, ZoneId } | |
import org.scalatest._ | |
import scala.concurrent.duration._ | |
class DebouncerSpec extends FlatSpec with Matchers { | |
trait Fixture { | |
class TestScheduler(clock: Clock) extends Debouncer.SchedulerLike { | |
@volatile var action: () => Unit = _ | |
@volatile var nextInvocation: Long = Long.MaxValue | |
@volatile var currentTime: Long = 0L | |
override def scheduleOnce(delay: FiniteDuration)(body: () => Unit): Unit = synchronized { | |
action = body | |
nextInvocation = currentTime + delay.toMillis | |
} | |
def poll(): Unit = synchronized { | |
currentTime = clock.millis() | |
if (currentTime >= nextInvocation) { | |
nextInvocation = Long.MaxValue | |
action() | |
} | |
} | |
} | |
class TestClock extends Clock { | |
@volatile var currentTime: Long = 0 | |
def advanceTime(by: Long): Unit = synchronized { | |
require(by > 0) | |
currentTime += by | |
} | |
override def instant(): Instant = | |
Instant.ofEpochMilli(currentTime) | |
override def getZone(): ZoneId = | |
java.time.ZoneOffset.UTC | |
override def withZone(zone: ZoneId) = ??? | |
} | |
val clock = new TestClock | |
val scheduler = new TestScheduler(clock) | |
def advanceTime(amount: FiniteDuration): Unit = { | |
clock.advanceTime(amount.toMillis) | |
scheduler.poll() | |
} | |
var called = false | |
val delay = 5.seconds | |
val maxDelay = 20.seconds | |
lazy val debouncer = Debouncer("test", scheduler, delay, maxDelay, clock) { () => | |
called = true | |
} | |
} | |
it should "call the action only after the delay has passed" in new Fixture { | |
debouncer.call() | |
called.shouldBe(false) | |
advanceTime(delay) | |
called.shouldBe(true) | |
} | |
it should "extend the deadline if called before the delay has passed" in new Fixture { | |
debouncer.call() | |
called.shouldBe(false) | |
advanceTime(2.seconds) | |
called.shouldBe(false) | |
debouncer.call() | |
advanceTime(4.seconds) | |
called.shouldBe(false) | |
advanceTime(delay) | |
called.shouldBe(true) | |
} | |
it should "call after maxDelay has passed" in new Fixture { | |
(1 to 20).foreach { _ => | |
debouncer.call() | |
called.shouldBe(false) | |
advanceTime(1.second) | |
} | |
called.shouldBe(true) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment