Created
October 16, 2009 11:47
-
-
Save ask/211766 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TokenBucketQueue(object): | |
"""An implementation of the token bucket algorithm. | |
See http://en.wikipedia.org/wiki/Token_Bucket | |
Most of this code was stolen from an entry in the ASPN Python Cookbook: | |
http://code.activestate.com/recipes/511490/ | |
:param fill_rate: see :attr:`fill_rate`. | |
:keyword capacity: see :attr:`capacity`. | |
.. attribute:: fill_rate | |
The rate in tokens/second that the bucket will be refilled. | |
.. attribute:: capacity | |
Maximum number of tokens in the bucket. Default is ``1``. | |
.. attribute:: timestamp | |
Timestamp of the last time a token was taken out of the bucket. | |
""" | |
BucketRateExceeded = BucketRateExceeded | |
def __init__(self, fill_rate, queue=None, capacity=1): | |
self.capacity = float(capacity) | |
self._tokens = self.capacity | |
self.queue = queue | |
if not self.queue: | |
self.queue = Queue() | |
self.fill_rate = float(fill_rate) | |
self.timestamp = time.time() | |
def put(self, item, block=True): | |
"""Put an item into the queue. | |
Also see :meth:`Queue.Queue.put`. | |
""" | |
put = self.queue.put if block else self.queue.put_nowait | |
put(item) | |
def get(self, block=True): | |
"""Remove and return an item from the queue. | |
:raises BucketRateExceeded: If a token could not be consumed from the | |
token bucket (consuming from the queue too fast). | |
:raises Queue.Empty: If an item is not immediately available. | |
Also see :meth:`Queue.Queue.get`. | |
""" | |
get = self.queue.get if block else self.queue.get_nowait | |
if not self.can_consume(1): | |
raise BucketRateExceeded() | |
return get() | |
def get_nowait(self): | |
"""Remove and return an item from the queue without blocking. | |
:raises BucketRateExceeded: If a token could not be consumed from the | |
token bucket (consuming from the queue too fast). | |
:raises Queue.Empty: If an item is not immediately available. | |
Also see :meth:`Queue.Queue.get_nowait`.""" | |
return self.get(block=True) | |
def put_nowait(self, item): | |
"""Put an item into the queue without blocking. | |
:raises Queue.Full: If a free slot is not immediately available. | |
Also see :meth:`Queue.Queue.put_nowait` | |
""" | |
return self.put(item, block=True) | |
def qsize(self): | |
"""Returns the size of the queue. | |
See :meth:`Queue.Queue.qsize`. | |
""" | |
return self.queue.qsize() | |
def wait(self, block=False): | |
"""Wait until a token can be retrieved from the bucket and return | |
the next item.""" | |
while True: | |
remaining = self.expected_time() | |
if not remaining: | |
return self.get(block=block) | |
time.sleep(remaining) | |
def can_consume(self, tokens=1): | |
"""Consume tokens from the bucket. Returns True if there were | |
sufficient tokens otherwise False.""" | |
if tokens <= self._get_tokens(): | |
self._tokens -= tokens | |
return True | |
return False | |
def expected_time(self, tokens=1): | |
"""Returns the expected time in seconds when a new token should be | |
available.""" | |
tokens = max(tokens, self._get_tokens()) | |
return (tokens - self._get_tokens()) / self.fill_rate | |
def _get_tokens(self): | |
if self._tokens < self.capacity: | |
now = time.time() | |
delta = self.fill_rate * (now - self.timestamp) | |
self._tokens = min(self.capacity, self._tokens + delta) | |
self.timestamp = now | |
return self._tokens |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment