Skip to content

Instantly share code, notes, and snippets.

@quatrix
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save quatrix/6b177278ca9d192ef17f to your computer and use it in GitHub Desktop.
Save quatrix/6b177278ca9d192ef17f to your computer and use it in GitHub Desktop.
package main
// basically just reimplemented (paritally) the TimeSpanSemaphore
// as specified here http://joelfillmore.com/throttling-web-api-calls/
import (
"container/heap"
"fmt"
"sync"
"time"
)
type empty struct{}
type semaphore chan empty
type TimeHeap []time.Time
func (h TimeHeap) Len() int { return len(h) }
func (h TimeHeap) Less(i, j int) bool { return h[i].Before(h[j]) }
func (h TimeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *TimeHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(time.Time))
}
func (h *TimeHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type TSS struct {
//
pool semaphore
// the time span for the max number of callers
resetSpan time.Duration
// keep track of the release times
releaseTimes *TimeHeap
// protect release time queue
queueLock sync.Mutex
}
func NewTSS(maxCount int, resetSpan time.Duration) TSS {
th := &TimeHeap{}
for i := 0; i < maxCount; i++ {
*th = append(*th, time.Unix(0, 0))
}
heap.Init(th)
return TSS{
pool: make(semaphore, maxCount),
resetSpan: resetSpan,
releaseTimes: th,
}
}
// Blocks the current thread until it can enter the semaphore, while observing a CancellationToken
func (t TSS) getOldestRelease() time.Time {
t.queueLock.Lock()
defer t.queueLock.Unlock()
return heap.Pop(t.releaseTimes).(time.Time)
}
func (t TSS) enqueueNow() {
t.queueLock.Lock()
defer t.queueLock.Unlock()
heap.Push(t.releaseTimes, time.Now())
}
func (t TSS) Wait() {
t.pool <- empty{}
// get the oldest release from the queue
oldestRelease := t.getOldestRelease()
// sleep until the time since the previous release equals the reset period
now := time.Now()
windowReset := oldestRelease.Add(t.resetSpan)
if windowReset.After(now) {
td := windowReset.UnixNano() - now.UnixNano()
fmt.Printf("waiting %s msecs", td)
<-time.After(time.Duration(td) * time.Nanosecond)
// check the cancelChan for cancallation
//
// Release();
// cancelToken.ThrowIfCancellationRequested();
}
}
/// Exits the semaphore
func (t TSS) Release() {
t.enqueueNow()
<-t.pool
}
func (t TSS) Run(signal chan bool) {
t.Wait()
defer t.Release()
signal <- true
}
func main() {
tss := NewTSS(1, time.Second)
wg := sync.WaitGroup{}
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
signal := make(chan bool)
go tss.Run(signal)
<-signal
fmt.Printf("[%s] yay\n", time.Now())
wg.Done()
}()
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment