Scala key based rate limiter with automatic key eviction.
Depends on Guava and Akka Actors.
Scala key based rate limiter with automatic key eviction.
Depends on Guava and Akka Actors.
import java.util.concurrent.TimeUnit.SECONDS | |
import java.util.concurrent.atomic.AtomicInteger | |
import akka.actor.Scheduler | |
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration.FiniteDuration | |
/** | |
* A local rate limiter. | |
* The rate limiter loosely ensures that no more than threshold accesses are | |
* permitted inside the window of time. | |
* | |
* @param threshold | |
* @param window | |
* @param scheduler | |
* @param ec | |
* @tparam K | |
*/ | |
class RateLimiter[K]( | |
threshold: Int, | |
window: FiniteDuration | |
)( | |
implicit scheduler: Scheduler, | |
ec: ExecutionContext | |
) { | |
private val cache = CacheBuilder | |
.newBuilder() | |
.expireAfterAccess(window.toSeconds, SECONDS) | |
.build(new CacheLoader[K, AtomicInteger] { | |
def load(key: K) = new AtomicInteger(1) | |
}) | |
.asInstanceOf[LoadingCache[K, AtomicInteger]] | |
/** | |
* Testing does not count against the key's quota. | |
* @return false if a key is currently rate limited. | |
*/ | |
def test(k: K) = cache.asMap.getOrDefault(k, new AtomicInteger(0)).get < threshold | |
def contains(k: K) = cache.asMap.containsKey(k) | |
/** | |
* Performing the action counts against the key's quota. | |
* @return Some result of the action if the key is not rate limited. | |
* If none, the action was not invoked. | |
*/ | |
def attempt[A](k: K)(action: => A): Option[A] = { | |
val counter = cache.get(k) | |
if (counter.get < threshold) { | |
counter.incrementAndGet() | |
scheduler.scheduleOnce(window)(cache.get(k).decrementAndGet()) | |
Some(action) | |
} else None | |
} | |
} |
To avoid a dependency on Akka actors you could make the cache value a guava
RateLimiter
, but theAtomicInteger
approach is a bit lighter if you already have Akka.