Last active
August 29, 2015 14:19
-
-
Save quatrix/6b177278ca9d192ef17f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
// basically just reimplemented (paritally) the TimeSpanSemaphore | |
// as specified here http://joelfillmore.com/throttling-web-api-calls/ | |
import ( | |
"container/heap" | |
"fmt" | |
"sync" | |
"time" | |
) | |
type empty struct{} | |
type semaphore chan empty | |
type TimeHeap []time.Time | |
func (h TimeHeap) Len() int { return len(h) } | |
func (h TimeHeap) Less(i, j int) bool { return h[i].Before(h[j]) } | |
func (h TimeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } | |
func (h *TimeHeap) Push(x interface{}) { | |
// Push and Pop use pointer receivers because they modify the slice's length, | |
// not just its contents. | |
*h = append(*h, x.(time.Time)) | |
} | |
func (h *TimeHeap) Pop() interface{} { | |
old := *h | |
n := len(old) | |
x := old[n-1] | |
*h = old[0 : n-1] | |
return x | |
} | |
type TSS struct { | |
// | |
pool semaphore | |
// the time span for the max number of callers | |
resetSpan time.Duration | |
// keep track of the release times | |
releaseTimes *TimeHeap | |
// protect release time queue | |
queueLock sync.Mutex | |
} | |
func NewTSS(maxCount int, resetSpan time.Duration) TSS { | |
th := &TimeHeap{} | |
for i := 0; i < maxCount; i++ { | |
*th = append(*th, time.Unix(0, 0)) | |
} | |
heap.Init(th) | |
return TSS{ | |
pool: make(semaphore, maxCount), | |
resetSpan: resetSpan, | |
releaseTimes: th, | |
} | |
} | |
// Blocks the current thread until it can enter the semaphore, while observing a CancellationToken | |
func (t TSS) getOldestRelease() time.Time { | |
t.queueLock.Lock() | |
defer t.queueLock.Unlock() | |
return heap.Pop(t.releaseTimes).(time.Time) | |
} | |
func (t TSS) enqueueNow() { | |
t.queueLock.Lock() | |
defer t.queueLock.Unlock() | |
heap.Push(t.releaseTimes, time.Now()) | |
} | |
func (t TSS) Wait() { | |
t.pool <- empty{} | |
// get the oldest release from the queue | |
oldestRelease := t.getOldestRelease() | |
// sleep until the time since the previous release equals the reset period | |
now := time.Now() | |
windowReset := oldestRelease.Add(t.resetSpan) | |
if windowReset.After(now) { | |
td := windowReset.UnixNano() - now.UnixNano() | |
fmt.Printf("waiting %s msecs", td) | |
<-time.After(time.Duration(td) * time.Nanosecond) | |
// check the cancelChan for cancallation | |
// | |
// Release(); | |
// cancelToken.ThrowIfCancellationRequested(); | |
} | |
} | |
/// Exits the semaphore | |
func (t TSS) Release() { | |
t.enqueueNow() | |
<-t.pool | |
} | |
func (t TSS) Run(signal chan bool) { | |
t.Wait() | |
defer t.Release() | |
signal <- true | |
} | |
func main() { | |
tss := NewTSS(1, time.Second) | |
wg := sync.WaitGroup{} | |
for i := 0; i < 100; i++ { | |
wg.Add(1) | |
go func() { | |
signal := make(chan bool) | |
go tss.Run(signal) | |
<-signal | |
fmt.Printf("[%s] yay\n", time.Now()) | |
wg.Done() | |
}() | |
} | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment