Skip to content

Instantly share code, notes, and snippets.

@alexrios
Last active May 29, 2021 21:08
Show Gist options
  • Save alexrios/f40c0ceda57b40ac8819011eb6c93bc8 to your computer and use it in GitHub Desktop.
Save alexrios/f40c0ceda57b40ac8819011eb6c93bc8 to your computer and use it in GitHub Desktop.
Ticker Based Leaky Bucket
package main
import (
"errors"
"fmt"
"sync"
"time"
)
var ErrBucketReachedCap = errors.New("bucket reached its max capacity")
// tickLeakyBucket represents a leaky bucket backed by time.ticker
type tickLeakyBucket struct {
currentVolume int
fullBucket int
mu sync.Mutex
// guarantees that the Fix method will be effectively called only once
fixOnce sync.Once
// signaling the ticker to stop
done chan struct{}
}
// NewTickLeakyBucket creates a new leaky bucket with a predetermined capacity and leak itself based on freq
func NewTickLeakyBucket(capacity int, freq time.Duration) *tickLeakyBucket {
bucket := tickLeakyBucket{
currentVolume: 0,
fullBucket: capacity,
mu: sync.Mutex{},
done: make(chan struct{}),
fixOnce: sync.Once{},
}
ticker := time.NewTicker(freq)
go func() {
defer ticker.Stop()
for {
select {
case <-bucket.done:
return
case _ = <-ticker.C:
bucket.leak()
}
}
}()
return &bucket
}
func (c *tickLeakyBucket) leak() {
c.mu.Lock()
defer c.mu.Unlock()
if c.currentVolume > 0 {
c.currentVolume--
}
}
func (c *tickLeakyBucket) Size() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.currentVolume
}
// Refill tries to fill the bucket again.
// returns ErrBucketReachedCap when it reaches total capacity.
func (c *tickLeakyBucket) Refill() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.currentVolume < c.fullBucket {
c.currentVolume++
return nil
}
return ErrBucketReachedCap
}
// Fix stop a bucket from leak ever again.
func (c *tickLeakyBucket) Fix() {
c.fixOnce.Do(func () {
c.done <- struct{}{}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment