Last active
July 11, 2022 07:55
-
-
Save joostjager/bca727bdd4fc806e4c0050e12838ffa3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"time" | |
"go.uber.org/zap" | |
"go.uber.org/zap/zapcore" | |
"golang.org/x/time/rate" | |
) | |
const ( | |
defaultInterval = 100 * time.Millisecond | |
maxInterval = 3200 * time.Millisecond | |
burst = 5 | |
spammers = 3 | |
// perfectSpammers configures the spammer to have perfect knowledge of the | |
// rate limit and always stay right below it. | |
perfectSpammers = false | |
) | |
func main() { | |
node := newNode() | |
go node.run() | |
// send sends a packet from the specified source to the simulated node. | |
send := func(source string, logFail bool, ignoreLimitExceedingPackets bool) { | |
ch := make(chan bool) | |
node.incoming <- &packet{ | |
source: source, | |
success: ch, | |
ignoreLimitExceedingPackets: ignoreLimitExceedingPackets, | |
} | |
success := <-ch | |
if success { | |
log.Infow("Success packet", "source", source) | |
} else if logFail { | |
log.Errorw("Failed packet", "source", source) | |
} | |
} | |
// start a loop for a legitimate user sending packets at a low rate. | |
go func() { | |
for { | |
send("user", true, false) | |
time.Sleep(500 * time.Millisecond) | |
} | |
}() | |
// send spam packets from multiple sources. | |
for { | |
spammerIndex := rand.Int31n(spammers) | |
source := fmt.Sprintf("spammer-%v", spammerIndex) | |
send(source, false, perfectSpammers) | |
time.Sleep(10 * time.Millisecond) | |
} | |
} | |
type packet struct { | |
source string | |
success chan bool | |
ignoreLimitExceedingPackets bool | |
} | |
type node struct { | |
incoming chan *packet | |
} | |
type limiter struct { | |
lim *rate.Limiter | |
increaseTime time.Time | |
interval time.Duration | |
} | |
func newNode() *node { | |
return &node{ | |
incoming: make(chan *packet), | |
} | |
} | |
func (n *node) run() error { | |
inLimiters := make(map[string]*limiter) | |
// Create an output limiter. Normally a node doesn't do output limiting and | |
// it is the next node that limits its input. This is only a simplification | |
// for this test. | |
outLimiter := rate.NewLimiter(rate.Every(defaultInterval), burst) | |
for { | |
select { | |
case in := <-n.incoming: | |
// Get incoming limiter for this source. | |
lim, ok := inLimiters[in.source] | |
if !ok { | |
// Create limiter if not exists. | |
lim = &limiter{ | |
interval: defaultInterval, | |
lim: rate.NewLimiter(rate.Every(defaultInterval), burst), | |
} | |
inLimiters[in.source] = lim | |
} | |
// Check input limiter. | |
inAllow := lim.lim.Allow() | |
// If input is allowed, check output limiter (which really is the | |
// input limiter of the next node). | |
var outAllow bool | |
if inAllow { | |
outAllow = outLimiter.Allow() | |
} | |
// If a limit is exceeded, halve the rate. | |
if !inAllow || !outAllow { | |
// ignoreLimitExceedingPackets is a trick to simulate a spammer | |
// with perfect knowledge. They tripped the rate limit, but we | |
// act here as if it didn't happen because a perfect spammer | |
// wouldn't have sent this packet. | |
if in.ignoreLimitExceedingPackets { | |
in.success <- false | |
continue | |
} | |
newInterval := lim.interval * 2 | |
if newInterval < maxInterval { | |
log.Infow("Increasing interval", "source", in.source, | |
"newInterval", newInterval, "inputLimitExceeded", !inAllow) | |
lim.interval = newInterval | |
lim.lim.SetLimit(rate.Every(newInterval)) | |
lim.increaseTime = time.Now().Add(5 * time.Second) | |
} | |
in.success <- false | |
continue | |
} | |
in.success <- true | |
// Periodic timeout to keep checking for increases. | |
case <-time.After(time.Second): | |
} | |
// Check for limit increases after inputs behaved well for some time. | |
for source, lim := range inLimiters { | |
if lim.increaseTime.IsZero() { | |
continue | |
} | |
if lim.interval == defaultInterval { | |
continue | |
} | |
if time.Now().After(lim.increaseTime) { | |
newInterval := lim.interval / 2 | |
log.Infow("Decreasing interval", "source", source, "newInterval", newInterval) | |
lim.interval = newInterval | |
lim.lim.SetLimit(rate.Every(newInterval)) | |
lim.increaseTime = time.Now().Add(5 * time.Second) | |
} | |
} | |
} | |
} | |
var log *zap.SugaredLogger | |
func init() { | |
config := zap.NewDevelopmentConfig() | |
config.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder | |
config.EncoderConfig.EncodeCaller = nil | |
config.DisableStacktrace = true | |
rawLog, _ := config.Build() | |
log = rawLog.Sugar() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment