Skip to content

Instantly share code, notes, and snippets.

Created June 18, 2022 12:36
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
Leaky Bucket Algorithm in Go
package main
import (
// Task is an abstraction that represents a task that can be submitted to the rate
// limiter for processing.
type Task interface {
type RateLimiter[T Task] interface {
// Start initializes the rate limiter and starts the intake & processing of
// tasks. Until it's called, no task is processed.
// Stop stops the intake & processing of tasks. Once Stop is called, until Start
// is called again, the rate limiter stays inactive.
type lbLimiter[T Task] struct {
interval time.Duration
pollInterval time.Duration
out chan<- T
in <-chan T
running bool
func (l *lbLimiter[T]) isRunning() bool {
defer l.RUnlock()
return l.running
func (l *lbLimiter[T]) Start() {
defer l.Unlock()
l.running = true
go func() {
for l.isRunning() {
select {
case w := <
go func() { l.out <- w }()
func (l *lbLimiter[T]) Stop() {
defer l.Unlock()
l.running = false
// NewLeakyBucketRateLimiter returns a RateLimiter that uses the "leaky bucket"
// algorithm to limit the rate tasks added to the input channel are being
// processed. These tasks are passed to the output channel at a maximum rate
// denoted by "ratePerSecond". "input" & "output" may be a buffered channels
// that can be used to control the max concurrency the bucket can be filled
// and emptied.
// Once either of these channels are closed, the rate limiter will
// not be able to rate limit & output further tasks.
func NewLeakyBucketRateLimiter[T Task](
ratePerSecond uint,
input <-chan T,
output chan<- T,
) RateLimiter[T] {
outInterval := time.Second / time.Duration(ratePerSecond)
return &lbLimiter[T]{
interval: outInterval,
out: output,
in: input,
pollInterval: outInterval / 10,
func main() {
out := make(chan int64)
in := make(chan int64, 10)
// Create a rate limiter that allows processing 5 tasks / second
rl := NewLeakyBucketRateLimiter[int64](5, in, out)
// Stop processing tasks in 10 seconds and exit the program
go func() {
time.Sleep(10 * time.Second)
// Try adding tasks at a rate of 10 tasks / seconds
go func() {
for i := 0; ; i++ {
ts := time.Now().Format(time.RFC3339)
select {
case in <- int64(i):
fmt.Printf("[%s] Added one item in the bucket\n", ts)
fmt.Printf("[%s] The bucket is full\n", ts)
time.Sleep(time.Millisecond * 100)
// Process tasks
t := time.Now()
for c := range out {
now := time.Now()
dt := now.Sub(t)
t = now
fmt.Printf("[%s] Ran the operation %d after %d milliseconds\n",
now.Format(time.RFC3339), c, dt.Milliseconds())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment