Skip to content

Instantly share code, notes, and snippets.

@guersam
Created March 1, 2023 13:26
Show Gist options
  • Save guersam/5ecccfb84c0d6c0f39f3182f91bacdc1 to your computer and use it in GitHub Desktop.
Save guersam/5ecccfb84c0d6c0f39f3182f91bacdc1 to your computer and use it in GitHub Desktop.
//> using scala "3.2.2"
//> using dep "nl.vroste::rezilience:0.9.2"
//> using dep "dev.zio::zio:2.0.9"
//> using dep "dev.zio::zio-test:2.0.9"
package nl.vroste.rezilience
import zio.*
import zio.stream.ZStream
import zio.test.*
import zio.test.Assertion.*
object RezilienceRateLimiterSpec extends ZIOSpecDefault {
val spec = suite("RateLimiter")(
test("holds back up calls after the max") {
ZIO.scoped {
for {
rl <- makeRateLimiterWithoutBurst(10, 1.second)
now <- Clock.instant
fib <- ZIO.foreach((1 to 20).toList)(_ => rl(Clock.instant)).fork
_ <- Clock.sleep(1.second)
later <- Clock.instant
times <- fib.join
} yield assert(times.drop(10))(forall(isGreaterThanEqualTo(later)))
}
} @@ TestAspect.withLiveClock
)
def makeRateLimiterWithoutBurst(
max: Int,
interval: Duration = 1.second
): ZIO[Scope, Nothing, RateLimiter] =
for {
q <- Queue
.bounded[(Ref[Boolean], UIO[Any])](
Util.nextPow2(max)
) // Power of two because it is a more efficient queue implementation
_ <- ZStream
.fromQueue(q, maxChunkSize = 1)
.filterZIO { case (interrupted, effect @ _) => interrupted.get.map(!_) }
.throttleShape(max.toLong, interval, 0 /* max.toLong */ )(_.size.toLong)
.mapZIOParUnordered(Int.MaxValue) { case (interrupted @ _, effect) =>
effect
}
.runDrain
.forkScoped
} yield new RateLimiter {
override def apply[R, E, A](task: ZIO[R, E, A]): ZIO[R, E, A] = for {
start <- Promise.make[Nothing, Unit]
done <- Promise.make[Nothing, Unit]
interruptedRef <- Ref.make(false)
action = start.succeed(()) *> done.await
onInterruptOrCompletion = interruptedRef.set(true) *> done.succeed(())
result <-
ZIO.scoped[R] {
ZIO.acquireReleaseInterruptible(
q.offer((interruptedRef, action))
.onInterrupt(onInterruptOrCompletion)
)(
onInterruptOrCompletion
) *> start.await *> task
}
} yield result
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment