Skip to content

Instantly share code, notes, and snippets.

@ifraixedes
Last active April 19, 2022 17:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ifraixedes/4783e9b279edfcf97d8d072639ba4df1 to your computer and use it in GitHub Desktop.
Save ifraixedes/4783e9b279edfcf97d8d072639ba4df1 to your computer and use it in GitHub Desktop.

Failrate

This implementation was developed for Storj Labs and was part of some of their systems, however, it was removed at some point due to it wasn't needed anymore and I've kept it in this gist for future reference.

See doc.go file for more information about what this package provides.

// Package failrate provides a rate limiter for handling a limiter per key on
// failed operations.
//
// A key is unique string that identifies the entity that requests an operation
// to be rate-limited.
//
// After an operation is executed the caller must indicate to the limiter if it
// has succeeded or failed.
//
// When an operation fails, the rate limiter creates and register a limiter for
// the specified key if there isn't already a registered one.
// Subsequent operations (for the specified key) are checked through the limiter
// and canceling the counting to the limit if the operation succeeds.
//
// Successful operations don't make any change to the limit, while failed ones
// count to the limit.
//
// Because it is impossible to know if an operation will succeed or fail, the
// request of an operation call is refused if the limiter has reached the
// allowance, despite what it is going to be its result because that's the
// purpose of the limiter (i.e. Limit the number of call to an operation).
//
// Rate limiter are unregistered and deleted on least recently used fashion or
// when an operation request over a specific key succeed and its registered
// limit reaches its initial state (i.e. Limiter has its full allowance).
package failrate
module failrate
go 1.18
require (
github.com/spacemonkeygo/monkit/v3 v3.0.17
github.com/stretchr/testify v1.7.1
github.com/zeebo/errs v1.3.0
golang.org/x/time v0.0.0-20220411224347-583f2d630306
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf
storj.io/gateway-mt v1.27.1
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/pprof v0.0.0-20211108044417-e9b028704de0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
)
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/pprof v0.0.0-20211108044417-e9b028704de0 h1:rsq1yB2xiFLDYYaYdlGBsSkwVzsCo500wMhxvW5A/bk=
github.com/google/pprof v0.0.0-20211108044417-e9b028704de0/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spacemonkeygo/monkit/v3 v3.0.17 h1:rqIuLhRUr2UtS3WNVbPY/BwvjlwKVvSOVY5p0QVocxE=
github.com/spacemonkeygo/monkit/v3 v3.0.17/go.mod h1:kj1ViJhlyADa7DiA4xVnTuPA46lFKbM7mxQTrXCuJP4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/zeebo/errs v1.3.0 h1:hmiaKqgYZzcVgRL1Vkc1Mn2914BbzB0IBxs+ebeutGs=
github.com/zeebo/errs v1.3.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838 h1:71vQrMauZZhcTVK6KdYM+rklehEEwb3E+ZhaE5jrPrE=
golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20220411224347-583f2d630306 h1:+gHMid33q6pen7kv9xvT+JRinntgeXO2AeZVd0AWD3w=
golang.org/x/time v0.0.0-20220411224347-583f2d630306/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf h1:D5xZTDOlTTQWdAWeKKm2pFLcz1sceH+f/pVAcYB9jL8=
storj.io/common v0.0.0-20220414110316-a5cb7172d6bf/go.mod h1:LBJrpAqL4MNSrhGEwc8SJ+tIVtgfCtFEZqDy6/0j67A=
storj.io/drpc v0.0.30 h1:jqPe4T9KEu3CDBI05A2hCMgMSHLtd/E0N0yTF9QreIE=
storj.io/gateway-mt v1.27.1 h1:lzz36v83L9FE2CZ9J1UkztEsMo1og4bOFcg03vsvwwc=
storj.io/gateway-mt v1.27.1/go.mod h1:v1nh/EEVghQADf1BxaIu3iQLzcPKrG8vSXWtN95EIhQ=
package failrate
import (
"context"
"net/http"
"sync"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"golang.org/x/time/rate"
"storj.io/common/lrucache"
"storj.io/gateway-mt/pkg/trustedip"
)
var mon = monkit.Package()
// LimitersConfig configures a failure rate limiter.
type LimitersConfig struct {
MaxReqsSecond int `help:"maximum number of allowed operations per second starting when first failure operation happens" default:"2" testDefault:"1"`
Burst int `help:"maximum number of allowed operations to overpass the maximum operations per second" default:"3" testDefault:"1"`
NumLimits int `help:"maximum number of keys/rate-limit pairs stored in the LRU cache" default:"1000" testDefault:"10"`
}
// Limiters register a rate limit per key when the operation is marked as failed
// for allowing to track subsequent operations on the registered keys and count
// the failed operations to be limited.
//
// The successful ones do not count to the rate limit nor contribute to
// unregister them.
type Limiters struct {
limiters *lrucache.ExpiringLRU
limit rate.Limit
burst int
}
// NewLimiters creates an Limiters returning an error if the c.MaxReqSecond,
// c.Burst or c.NumLimits are 0 or negative.
func NewLimiters(c LimitersConfig) (*Limiters, error) {
if c.MaxReqsSecond <= 0 {
return nil, errs.New("MaxReqsSecond cannot be zero or negative")
}
if c.Burst <= 0 {
return nil, errs.New("Burst cannot be zero or negative")
}
if c.NumLimits <= 0 {
return nil, errs.New("NumLimits cannot be zero or negative")
}
return &Limiters{
limiters: lrucache.New(lrucache.Options{Capacity: c.NumLimits}),
limit: 1 / rate.Limit(c.MaxReqsSecond), // minium interval between requests
burst: c.Burst,
}, nil
}
// Allow returns true and non-nil succeeded and failed, and a zero delay if key
// is allowed to perform an operation, otherwise false, succeeded and failed are
// nil, and delay is greater than 0.
//
// key is allowed to make the request if it isn't tracked or it's tracked but it
// hasn't reached the limit.
//
// When key isn't tracked, it gets tracked when failed is executed and
// subsequent Allow calls with key will be rate-limited. succeeded untrack the
// key when the rate-limit doesn't apply anymore. For these reason the caller
// MUST always call succeeded or failed when true is returned.
func (irl *Limiters) Allow(ctx context.Context, key string) (allowed bool, succeeded func(), failed func(), delay time.Duration) {
finish := mon.Task()(&ctx)
v, ok := irl.limiters.GetCached(key)
if ok {
rl := v.(*limiter)
allowed, delay, rollback := rl.Allow(ctx)
if !allowed {
mon.Counter("fail_rate_limiting_banning").Inc(1)
return false, nil, nil, delay
}
// When the key is already tracked, failed func doesn't have to do anything.
return true, func() {
defer finish(nil)
// The operations has succeeded, hence rollback the consumed rate-limit
// allowance.
rollback()
if rl.IsOnInitState() {
irl.limiters.Delete(key)
mon.Counter("fail_rate_limiting_entries").Dec(1)
monkit.SpanFromCtx(ctx).Annotate("Delete rate-limiter for", key)
}
}, func() { finish(nil) }, 0
}
return true, func() { finish(nil) }, func() {
defer finish(nil)
// The operation is failed, hence we start to rate-limit the key.
rl := newRateLimiter(irl.limit, irl.burst)
irl.limiters.Add(key, rl)
mon.Counter("fail_rate_limiting_entries").Inc(1)
monkit.SpanFromCtx(ctx).Annotate("Create rate-limiter for", key)
// Consume one operation, which is this failed one.
rl.Allow(ctx)
}, 0
}
// limitersAllowReqTrustAnyIP avoids to call thist method on every
// Limiters.AllowReq call.
var limitersAllowReqTrustAnyIP = trustedip.NewListTrustAll()
// AllowReq gets uses the client IP from r as key to call the Allow method.
//
// It gets the IP of the client from the 'Forwarded', 'X-Forwarded-For', or
// 'X-Real-Ip' headers, returning it from the first header which are checked in
// that specific order; if any of those header exists then it gets the IP from
// r.RemoteAddr.
// It panics if r is nil.
func (irl *Limiters) AllowReq(r *http.Request) (allowed bool, succeeded func(), failed func(), delay time.Duration) {
ctx := r.Context()
defer mon.Task()(&ctx)(nil)
ip := trustedip.GetClientIP(limitersAllowReqTrustAnyIP, r)
return irl.Allow(ctx, ip)
}
// limiter is a wrapper around rate.Limiter to suit the Limiters reui
// requirements.
type limiter struct {
limiter *rate.Limiter
mu sync.Mutex
reservation *reservation
}
func newRateLimiter(limit rate.Limit, burst int) *limiter {
return &limiter{
limiter: rate.NewLimiter(limit, burst),
}
}
// IsOnInitState returns true if the rate-limiter is back to its full allowance
// such is when it is created.
func (rl *limiter) IsOnInitState() bool {
now := time.Now()
rsvt := rl.limiter.ReserveN(now, rl.limiter.Burst())
// Cancel immediately the reservation because we are only interested in the
// finding out the delay of executing as many operations as burst.
// Using the same time when the reservation was created allows to cancel
// the reservation despite it's already consumed at this moment.
rsvt.CancelAt(now)
return rsvt.Delay() == 0
}
// Allow returns true when the operations is allowed to be performed, and also
// returns a rollback function for rolling it back the consumed token for not
// counting to the rate-limiting of future calls. Otherwise it returns false
// and the time duration that the caller must wait until being allowed to
// perform the operation and rollback is nil because there isn't an allowed
// operations to roll it back.
func (rl *limiter) Allow(ctx context.Context) (_ bool, _ time.Duration, rollback func()) {
defer mon.Task()(&ctx)(nil)
now := time.Now()
rl.mu.Lock()
defer rl.mu.Unlock()
rsvt := rl.reservation
if rsvt == nil {
rsvt = newReservation(rl.limiter, now)
}
if d := rsvt.Delay(now); d > 0 {
// If there is an imposed delay, it means that the reserved token cannot
// be consumed right now, so isn't allowed.
rl.reservation = rsvt
return false, d, nil
}
// The reservation can be consumed now, so we don't need to hold it anymore.
rl.reservation = nil
return true, 0, func() {
// Cancel the reservation if this allowance rolled back.
rsvt.Cancel()
}
}
// reservation is a wrapper of rate.Reservation for holding the time that is
// created and being able to cancel it in the future but retrospectively to its
// creation time.
type reservation struct {
r *rate.Reservation
createdAt time.Time
}
func newReservation(limiter *rate.Limiter, now time.Time) *reservation {
return &reservation{
r: limiter.ReserveN(now, 1),
createdAt: now,
}
}
// Delay returns the time that the caller should way to consumed it.
//
// Basically it's a wrapper of calling the rate.Reservation.DelayFrom, check its
// documentation for further information.
func (rsvp *reservation) Delay(now time.Time) time.Duration {
return rsvp.r.DelayFrom(now)
}
// Cancel cancels the reservation retrospectively to its creation time.
//
// Basically it's a wrapper of calling the rate.Reservation.CanceltAt passing
// its creation time; check its documentation for further information.
func (rsvp *reservation) Cancel() {
rsvp.r.CancelAt(rsvp.createdAt)
}
package failrate
import (
"fmt"
"net/http"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"storj.io/common/lrucache"
"storj.io/common/testcontext"
"storj.io/common/testrand"
)
func TestLimiters(t *testing.T) {
const ip = "172.28.254.80"
ctx := testcontext.New(t)
req := &http.Request{
RemoteAddr: "10.5.2.23",
Header: map[string][]string{
"X-Forwarded-For": {fmt.Sprintf("%s, 192.168.80.25", ip)},
"Forwarded": {fmt.Sprintf("for=%s, for=172.17.5.10", ip)},
"X-Real-Ip": {ip},
},
}
req = req.WithContext(ctx)
limiters, err := NewLimiters(LimitersConfig{MaxReqsSecond: 2, Burst: 3, NumLimits: 1})
require.NoError(t, err)
{ // Succeesful requests doesn't count to rate limit the IP.
for i := 1; i <= 10; i++ {
allowed, succeeded, _, _ := limiters.AllowReq(req)
require.Truef(t, allowed, "AlloReq: request %d", i)
succeeded()
}
for i := 1; i <= 10; i++ {
allowed, succeeded, _, _ := limiters.Allow(ctx, ip)
require.Truef(t, allowed, "Allow: request %d", i)
succeeded()
}
assertLRUContains(t, limiters.limiters, ip, false, "IP with successful requests doesn't have assigned a rate limiter")
}
{ // Failed requests counts to rate limit the IP.
for i := 1; i <= 2; i++ {
allowed, _, failed, _ := limiters.AllowReq(req)
require.Truef(t, allowed, "AllowReq: request %d", i)
failed()
}
// Execute the last one allowed but using directly the key (i.e. IP).
allowed, _, failed, _ := limiters.Allow(ctx, ip)
require.True(t, allowed, "Allow: request 3")
failed()
baseDelay := 2 * time.Second
for i := 4; i <= 5; i++ {
allowed, _, _, delay := limiters.AllowReq(req)
assert.Falsef(t, allowed, "AllowReq: request %d", i)
assert.LessOrEqual(t, delay, baseDelay, "retry duration")
baseDelay += time.Second / 2
}
// Execute another one not allowed but using directly the key (i.e. IP).
allowed, _, _, _ = limiters.Allow(ctx, ip)
assert.False(t, allowed, "Allow: request 6")
}
{ // New key evicts the oldest one when the cache size is reached.
const key = "new-key-evicts-older-one"
allowed, _, failed, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
failed()
assertLRUContains(t, limiters.limiters, ip, false, "previous key should have been removed")
}
{ // Succeeded removes an existing rate limit when it reaches the initial state.
const key = "will-be-at-init-state"
assertLRUContains(t, limiters.limiters, ip, false, "new key should be in the cache")
allowed, _, failed, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
// Failed operation counts for being rate-limited.
failed()
rateLimitStarted := time.Now() // this is because of the previous failed call.
assertLRUContains(t, limiters.limiters, key, true, "failed key should be in the cache")
allowed, succeeded, _, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
assertLRUContains(t, limiters.limiters, key, true, "allow shouldn't remove the key from the cache")
succeeded()
// Wait the time until the rate-limiter associated with the key is back to
// it's initial state. That's the time that can reserve an amount of
// operations equal to the burst without any delay.
time.Sleep(time.Until(rateLimitStarted.Add(2 * time.Second)))
allowed, succeeded, _, _ = limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
// Succeeded remove a tracked rate-limiter when it's to it's initial state.
succeeded()
// Verify that the rate-limiter has been untracked.
assertLRUContains(t, limiters.limiters, key, false, "succeeded should remove the key from the cache")
}
{ // Cheaters cannot use successful operations to bypass it.
const key = "cheater"
for i := 1; i <= 2; i++ {
allowed, _, failed, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
// Failed operation counts for being rate-limited.
failed()
}
// This operation is still allowed because of the burst allowance.
allowed, succeeded, _, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
// Succeeded operation doesn't count for being rate-limited
succeeded()
assertLRUContains(t, limiters.limiters, key, true,
"one succeeded operation shouldn't remove the key from the cache when there is not delay",
)
// This operation is still allowed because of the burst allowance and because
// the previous one succeeded, so it wasn't count by the rate-limited.
allowed, _, failed, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow")
failed()
// This operation is rate limited because the rate limit has not been
// cleared due to the last succeeded operations and it has surpassed the
// burst allowance.
allowed, _, _, _ = limiters.Allow(ctx, key)
assert.False(t, allowed, "Allow")
}
t.Run("not allowed key is allowed again if it waits for the delay for the following request", func(t *testing.T) {
const key = "no-allowed-wait-allowed-again"
limiters, err := NewLimiters(LimitersConfig{MaxReqsSecond: 1, Burst: 1, NumLimits: 1})
require.NoError(t, err)
// Explicitly reduce the time between calls to limit less than the minium
// the configuration allows for speeding the test up.
limiters.limit = rate.Every(time.Millisecond)
allowed, _, failed, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow: call 1")
failed()
allowed, _, _, delay := limiters.Allow(ctx, key)
// NOTE: it would fail if this second call to Allow isn't executed at least
// one millisecond than the previous one, hence this test will be flaky and
// we should use a greater duration for the limiters.limit at the expense of
// increasing the test running time.
assert.False(t, allowed, "Allow: call 2")
assert.LessOrEqual(t, delay, time.Millisecond, "retry duration")
time.Sleep(time.Millisecond)
allowed, succeeded, _, _ := limiters.Allow(ctx, key)
require.True(t, allowed, "Allow: call after wait")
succeeded()
})
}
func TestNewLimiters(t *testing.T) {
limiters, err := NewLimiters(LimitersConfig{
MaxReqsSecond: 5, Burst: 1, NumLimits: 1,
})
require.NoError(t, err)
require.NotNil(t, limiters)
}
func TestNewLimiters_error(t *testing.T) {
testCases := []struct {
desc string
config LimitersConfig
}{
{
desc: "zero max reqs per second",
config: LimitersConfig{MaxReqsSecond: 0, Burst: 2, NumLimits: 1},
},
{
desc: "negative max reqs per second",
config: LimitersConfig{MaxReqsSecond: -1, Burst: 5, NumLimits: 1},
},
{
desc: "zero burst",
config: LimitersConfig{MaxReqsSecond: 9, Burst: 0, NumLimits: 1},
},
{
desc: "negative burst",
config: LimitersConfig{MaxReqsSecond: 15, Burst: -5, NumLimits: 1},
},
{
desc: "zero num limits",
config: LimitersConfig{MaxReqsSecond: 5, Burst: 3, NumLimits: 0},
},
{
desc: "negative num limits",
config: LimitersConfig{MaxReqsSecond: 5, Burst: 1, NumLimits: -3},
},
{
desc: "negative max reqs per second and num limits",
config: LimitersConfig{MaxReqsSecond: -2, Burst: 10, NumLimits: -1},
},
{
desc: "zero burst and negative num limits",
config: LimitersConfig{MaxReqsSecond: 3, Burst: -1, NumLimits: -1},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
_, err := NewLimiters(tC.config)
require.Error(t, err)
})
}
}
func TestLimiters_concurrency(t *testing.T) {
limiters, err := NewLimiters(LimitersConfig{MaxReqsSecond: 2, Burst: 1, NumLimits: 2})
require.NoError(t, err)
// Explicitly reduce the time between calls to limit less than the minium
// the configuration allows for speeding the test up.
limiters.limit = rate.Every(time.Millisecond)
const iterations = 50
ctx := testcontext.New(t)
// Target key1
ctx.Go(func() error {
for i := 0; i < iterations; i++ {
allow, succeeded, failed, delay := limiters.Allow(ctx, "key1")
if !allow {
time.Sleep(delay)
} else {
if testrand.Intn(2) == 0 {
failed()
} else {
succeeded()
}
}
}
return nil
})
// Target key1
ctx.Go(func() error {
for i := 0; i < iterations; i++ {
allow, succeeded, failed, delay := limiters.Allow(ctx, "key1")
if !allow {
time.Sleep(delay)
} else {
if testrand.Intn(2) == 0 {
failed()
} else {
succeeded()
}
}
}
return nil
})
// Target key2
ctx.Go(func() error {
for i := 0; i < iterations; i++ {
allow, succeeded, failed, delay := limiters.Allow(ctx, "key2")
if !allow {
time.Sleep(delay)
} else {
if testrand.Intn(2) == 0 {
failed()
} else {
succeeded()
}
}
}
return nil
})
// Target key2
ctx.Go(func() error {
for i := 0; i < iterations; i++ {
allow, succeeded, failed, delay := limiters.Allow(ctx, "key2")
if !allow {
time.Sleep(delay)
} else {
if testrand.Intn(2) == 0 {
failed()
} else {
succeeded()
}
}
}
return nil
})
ctx.Wait()
}
func assertLRUContains(t *testing.T, lru *lrucache.ExpiringLRU, key string, contains bool, msg string) {
t.Helper()
_, cached := lru.GetCached(key)
assert.Equal(t, contains, cached, msg)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment