Skip to content

Instantly share code, notes, and snippets.

@gh-codejitsu
Created June 18, 2022 12:36
Show Gist options
  • Save gh-codejitsu/7bfa2b2153be57f199a844bde240c3fa to your computer and use it in GitHub Desktop.
Save gh-codejitsu/7bfa2b2153be57f199a844bde240c3fa to your computer and use it in GitHub Desktop.
Leaky Bucket Algorithm in Go
package main
import (
"fmt"
"sync"
"time"
)
// Task is an abstraction that represents a task that can be submitted to the rate
// limiter for processing.
type Task interface {
}
type RateLimiter[T Task] interface {
// Start initializes the rate limiter and starts the intake & processing of
// tasks. Until it's called, no task is processed.
Start()
// Stop stops the intake & processing of tasks. Once Stop is called, until Start
// is called again, the rate limiter stays inactive.
Stop()
}
type lbLimiter[T Task] struct {
sync.RWMutex
interval time.Duration
pollInterval time.Duration
out chan<- T
in <-chan T
running bool
}
func (l *lbLimiter[T]) isRunning() bool {
l.RLock()
defer l.RUnlock()
return l.running
}
func (l *lbLimiter[T]) Start() {
l.Lock()
defer l.Unlock()
l.running = true
go func() {
for l.isRunning() {
select {
case w := <-l.in:
go func() { l.out <- w }()
time.Sleep(l.interval)
default:
time.Sleep(l.pollInterval)
}
}
}()
}
func (l *lbLimiter[T]) Stop() {
l.Lock()
defer l.Unlock()
l.running = false
}
// NewLeakyBucketRateLimiter returns a RateLimiter that uses the "leaky bucket"
// algorithm to limit the rate tasks added to the input channel are being
// processed. These tasks are passed to the output channel at a maximum rate
// denoted by "ratePerSecond". "input" & "output" may be a buffered channels
// that can be used to control the max concurrency the bucket can be filled
// and emptied.
//
// Once either of these channels are closed, the rate limiter will
// not be able to rate limit & output further tasks.
func NewLeakyBucketRateLimiter[T Task](
ratePerSecond uint,
input <-chan T,
output chan<- T,
) RateLimiter[T] {
outInterval := time.Second / time.Duration(ratePerSecond)
return &lbLimiter[T]{
interval: outInterval,
out: output,
in: input,
pollInterval: outInterval / 10,
}
}
func main() {
out := make(chan int64)
in := make(chan int64, 10)
// Create a rate limiter that allows processing 5 tasks / second
rl := NewLeakyBucketRateLimiter[int64](5, in, out)
rl.Start()
// Stop processing tasks in 10 seconds and exit the program
go func() {
time.Sleep(10 * time.Second)
rl.Stop()
close(out)
}()
// Try adding tasks at a rate of 10 tasks / seconds
go func() {
for i := 0; ; i++ {
ts := time.Now().Format(time.RFC3339)
select {
case in <- int64(i):
fmt.Printf("[%s] Added one item in the bucket\n", ts)
default:
fmt.Printf("[%s] The bucket is full\n", ts)
}
time.Sleep(time.Millisecond * 100)
}
}()
// Process tasks
t := time.Now()
for c := range out {
now := time.Now()
dt := now.Sub(t)
t = now
fmt.Printf("[%s] Ran the operation %d after %d milliseconds\n",
now.Format(time.RFC3339), c, dt.Milliseconds())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment