Skip to content

Instantly share code, notes, and snippets.

@emicklei
Created November 2, 2016 10:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save emicklei/3340aa435a894915944c92ec4ff27bec to your computer and use it in GitHub Desktop.
Save emicklei/3340aa435a894915944c92ec4ff27bec to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"sync"
"time"
)
// Bucket holds a limited number of tokens which can be Taken (immediate or wait blocking) and must be Returned.
type Bucket struct {
// The mutex guards the fields following it.
protect sync.Mutex
quantum int64
capacity int64
waiters int // number of waiting go-routines for available tokens
returned chan int64
}
// NewBucket returns a Bucket initialized with cap tokens.
func NewBucket(cap int64) *Bucket {
return &Bucket{
quantum: cap,
capacity: cap,
waiters: 0,
returned: make(chan int64),
}
}
func (b *Bucket) String() string {
return fmt.Sprintf("q=%d ,c=%d, w=%d", b.quantum, b.capacity, len(b.returned))
}
// Take takes up to count immediately available tokens from the
// bucket. It returns the number of tokens removed, or zero if there are
// no available tokens. It does not block.
func (b *Bucket) Take(count int64) int64 {
b.protect.Lock()
defer b.protect.Unlock()
if b.quantum < count {
return 0
}
if count > b.capacity {
b.quantum = 0
return b.capacity
}
b.quantum -= count
return count
}
// Return returns count tokens to the bucket. You cannot return more tokens
// than the capacity of the bucket. It will ignore the overflow.go It does not block.
func (b *Bucket) Return(count int64) {
b.protect.Lock()
defer b.protect.Unlock()
b.quantum += count
// do not grow beyond capacity
if b.quantum > b.capacity {
b.quantum = b.capacity
}
// notify all waiters how many new tokens are available
for i := 0; i < b.waiters; i++ {
b.returned <- b.quantum
}
}
// TakeMaxDuration returns count tokens from the bucket within a timeout.
// Return 0 if not (enough) tokens are available within that time.
// It does block for at most the timeout.
func (b *Bucket) TakeMaxDuration(count int64, timeout time.Duration) int64 {
if t := b.Take(count); t == count {
return t
}
// This go-routine has to wait until tokens are available or the timeout
b.protect.Lock()
b.waiters++
b.protect.Unlock()
defer func() {
// This go-routine is done waiting
b.protect.Lock()
b.waiters--
b.protect.Unlock()
}()
deadline := time.NewTicker(timeout)
defer deadline.Stop()
for {
select {
case <-deadline.C:
return 0
case q := <-b.returned:
// see if current quantum is enough for this call
if count <= q {
if t := b.Take(count); t == count {
return t
}
}
}
}
}
/**
bucket := NewBucket(1000)
memory := bucket.TakeMaxDuration(500, 1*time.Second)
if memory != 0 {
// do work
bucket.Return(memory)
}
**/
func main() {
b := NewBucket(1000)
t := b.Take(100)
log.Printf("%v,t=%d", b, t)
b.Return(t)
log.Printf("%v,t=%d", b, t)
h := b.TakeMaxDuration(2000, 1*time.Second)
log.Printf("%v,h=%d", b, h)
log.Printf("%v,t=%d", b, b.Take(1000))
go func() {
time.Sleep(50 * time.Millisecond)
b.Return(1000)
log.Println("i returned 1000")
}()
go func() {
log.Println("waiting for 500")
h2 := b.TakeMaxDuration(500, 1*time.Second)
log.Printf("%v,h2=%d", b, h2)
}()
log.Println("waiting for 500")
h3 := b.TakeMaxDuration(500, 1*time.Second)
log.Printf("%v,h3=%d", b, h3)
time.Sleep(1 * time.Second)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment