Skip to content

Instantly share code, notes, and snippets.

@lukearno
Last active October 1, 2015 21:06
Show Gist options
  • Save lukearno/59ae3b1699aeef54d8a5 to your computer and use it in GitHub Desktop.
Save lukearno/59ae3b1699aeef54d8a5 to your computer and use it in GitHub Desktop.
from collections import deque
from threading import RLock
import time
class LeakyBucket(object):
def __init__(self, times, seconds, it=None):
self.times = times
self.seconds = seconds
self._events = deque()
self._rlock = RLock()
self.it = it
def wait(self):
with self._rlock:
now = time.time()
cutoff = now - self.seconds
while self._events and self._events[0] < cutoff:
self._events.popleft()
if len(self._events) < self.times:
self._events.append(now)
return now
else:
# Will never exceed recursion depth of 1
# because the first thread will wait
# until the recursion condition is over.
# and the other threads will wait for the lock.
time.sleep(self._events[0] - cutoff)
return self.wait()
def __iter__(self):
for thing in self.it:
self.wait()
yield thing
if __name__ == '__main__':
from threading import Thread
_tprint_lock = RLock()
def tprint(s):
with _tprint_lock:
print s
limitter = LeakyBucket(5, 3)
def do_it(i):
for _ in range(5):
tprint("T" + str(i) + str(limitter.wait()))
threads = [Thread(target=do_it, args=(i,)) for i in range(10)]
[t.start() for t in threads]
[t.join() for t in threads]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment