-
-
Save gh-codejitsu/7bfa2b2153be57f199a844bde240c3fa to your computer and use it in GitHub Desktop.
Leaky Bucket Algorithm in Go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"sync" | |
"time" | |
) | |
// Task is an abstraction that represents a task that can be submitted to the rate | |
// limiter for processing. | |
type Task interface { | |
} | |
type RateLimiter[T Task] interface { | |
// Start initializes the rate limiter and starts the intake & processing of | |
// tasks. Until it's called, no task is processed. | |
Start() | |
// Stop stops the intake & processing of tasks. Once Stop is called, until Start | |
// is called again, the rate limiter stays inactive. | |
Stop() | |
} | |
type lbLimiter[T Task] struct { | |
sync.RWMutex | |
interval time.Duration | |
pollInterval time.Duration | |
out chan<- T | |
in <-chan T | |
running bool | |
} | |
func (l *lbLimiter[T]) isRunning() bool { | |
l.RLock() | |
defer l.RUnlock() | |
return l.running | |
} | |
func (l *lbLimiter[T]) Start() { | |
l.Lock() | |
defer l.Unlock() | |
l.running = true | |
go func() { | |
for l.isRunning() { | |
select { | |
case w := <-l.in: | |
go func() { l.out <- w }() | |
time.Sleep(l.interval) | |
default: | |
time.Sleep(l.pollInterval) | |
} | |
} | |
}() | |
} | |
func (l *lbLimiter[T]) Stop() { | |
l.Lock() | |
defer l.Unlock() | |
l.running = false | |
} | |
// NewLeakyBucketRateLimiter returns a RateLimiter that uses the "leaky bucket" | |
// algorithm to limit the rate tasks added to the input channel are being | |
// processed. These tasks are passed to the output channel at a maximum rate | |
// denoted by "ratePerSecond". "input" & "output" may be a buffered channels | |
// that can be used to control the max concurrency the bucket can be filled | |
// and emptied. | |
// | |
// Once either of these channels are closed, the rate limiter will | |
// not be able to rate limit & output further tasks. | |
func NewLeakyBucketRateLimiter[T Task]( | |
ratePerSecond uint, | |
input <-chan T, | |
output chan<- T, | |
) RateLimiter[T] { | |
outInterval := time.Second / time.Duration(ratePerSecond) | |
return &lbLimiter[T]{ | |
interval: outInterval, | |
out: output, | |
in: input, | |
pollInterval: outInterval / 10, | |
} | |
} | |
func main() { | |
out := make(chan int64) | |
in := make(chan int64, 10) | |
// Create a rate limiter that allows processing 5 tasks / second | |
rl := NewLeakyBucketRateLimiter[int64](5, in, out) | |
rl.Start() | |
// Stop processing tasks in 10 seconds and exit the program | |
go func() { | |
time.Sleep(10 * time.Second) | |
rl.Stop() | |
close(out) | |
}() | |
// Try adding tasks at a rate of 10 tasks / seconds | |
go func() { | |
for i := 0; ; i++ { | |
ts := time.Now().Format(time.RFC3339) | |
select { | |
case in <- int64(i): | |
fmt.Printf("[%s] Added one item in the bucket\n", ts) | |
default: | |
fmt.Printf("[%s] The bucket is full\n", ts) | |
} | |
time.Sleep(time.Millisecond * 100) | |
} | |
}() | |
// Process tasks | |
t := time.Now() | |
for c := range out { | |
now := time.Now() | |
dt := now.Sub(t) | |
t = now | |
fmt.Printf("[%s] Ran the operation %d after %d milliseconds\n", | |
now.Format(time.RFC3339), c, dt.Milliseconds()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment