Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created November 20, 2019 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/683f172ea22e38f0c419e44c1aca56cb to your computer and use it in GitHub Desktop.
Save djspiewak/683f172ea22e38f0c419e44c1aca56cb to your computer and use it in GitHub Desktop.
final class RateLimiter[F[_]: Sync: Timer] private (chunks: Int, window: FiniteDuration, now: FiniteDuration, caution: Double) {
import RateLimiter.nowF
private val state = Ref.of[F, (Int, FiniteDuration)]((0, now))
// implements hard limiting, rather than graceful and cooperative backoff
def limiter[A]: Pipe[F, A, A] = { in =>
val actualKey = (key, window)
val cap = math.max((chunks * caution).toInt, 1)
def checkAndNext(in: Stream[F, A]): Pull[F, A, Stream[F, A]] = {
val pullF = for {
(count, start) <- state.get
now <- nowF
_ <- if (start + window < now)
state.tryUpdate(_ => (0, now))
else
().pure[F]
(count, start) <- state.get
pull <- if (count >= chunks) {
Timer[F].sleep((start + window) - now) >> checkAndNext(in)
} else {
state tryUpdate {
case (count, start) => (count + 1, start)
} map { success =>
if (success) {
in.pull.uncons flatMap {
case Some((head, tail)) =>
Pull.output(head).as(tail)
case None =>
Pull.empty
}
} else {
checkAndNext(in)
}
}
}
} yield pull
Pull.eval(pullF).flatten
}
def loop(in: Stream[F, A]): Pull[F, A, Nothing] =
checkAndNext(in).flatMap(loop)
loop(in)
}
}
object RateLimiter {
// only use 80% of the available rate limit
def apply[F[_]: Concurrent: Timer](chunks: Int, window: FiniteDuration, caution: Double = 0.8): RateLimiter[F] =
nowF[F] flatMap { now =>
Sync[F].delay(new RateLimiter[F](chunks, window, now, caution))
}
def nowF[F[_]: Timer: Functor]: F[FiniteDuration] =
Timer[F].clock.realTime(TimeUnit.MILLISECONDS).map(_.millis)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment