Skip to content

Instantly share code, notes, and snippets.

@ask
Created October 16, 2009 11:47
Show Gist options
  • Save ask/211766 to your computer and use it in GitHub Desktop.
Save ask/211766 to your computer and use it in GitHub Desktop.
class TokenBucketQueue(object):
"""An implementation of the token bucket algorithm.
See http://en.wikipedia.org/wiki/Token_Bucket
Most of this code was stolen from an entry in the ASPN Python Cookbook:
http://code.activestate.com/recipes/511490/
:param fill_rate: see :attr:`fill_rate`.
:keyword capacity: see :attr:`capacity`.
.. attribute:: fill_rate
The rate in tokens/second that the bucket will be refilled.
.. attribute:: capacity
Maximum number of tokens in the bucket. Default is ``1``.
.. attribute:: timestamp
Timestamp of the last time a token was taken out of the bucket.
"""
BucketRateExceeded = BucketRateExceeded
def __init__(self, fill_rate, queue=None, capacity=1):
self.capacity = float(capacity)
self._tokens = self.capacity
self.queue = queue
if not self.queue:
self.queue = Queue()
self.fill_rate = float(fill_rate)
self.timestamp = time.time()
def put(self, item, block=True):
"""Put an item into the queue.
Also see :meth:`Queue.Queue.put`.
"""
put = self.queue.put if block else self.queue.put_nowait
put(item)
def get(self, block=True):
"""Remove and return an item from the queue.
:raises BucketRateExceeded: If a token could not be consumed from the
token bucket (consuming from the queue too fast).
:raises Queue.Empty: If an item is not immediately available.
Also see :meth:`Queue.Queue.get`.
"""
get = self.queue.get if block else self.queue.get_nowait
if not self.can_consume(1):
raise BucketRateExceeded()
return get()
def get_nowait(self):
"""Remove and return an item from the queue without blocking.
:raises BucketRateExceeded: If a token could not be consumed from the
token bucket (consuming from the queue too fast).
:raises Queue.Empty: If an item is not immediately available.
Also see :meth:`Queue.Queue.get_nowait`."""
return self.get(block=True)
def put_nowait(self, item):
"""Put an item into the queue without blocking.
:raises Queue.Full: If a free slot is not immediately available.
Also see :meth:`Queue.Queue.put_nowait`
"""
return self.put(item, block=True)
def qsize(self):
"""Returns the size of the queue.
See :meth:`Queue.Queue.qsize`.
"""
return self.queue.qsize()
def wait(self, block=False):
"""Wait until a token can be retrieved from the bucket and return
the next item."""
while True:
remaining = self.expected_time()
if not remaining:
return self.get(block=block)
time.sleep(remaining)
def can_consume(self, tokens=1):
"""Consume tokens from the bucket. Returns True if there were
sufficient tokens otherwise False."""
if tokens <= self._get_tokens():
self._tokens -= tokens
return True
return False
def expected_time(self, tokens=1):
"""Returns the expected time in seconds when a new token should be
available."""
tokens = max(tokens, self._get_tokens())
return (tokens - self._get_tokens()) / self.fill_rate
def _get_tokens(self):
if self._tokens < self.capacity:
now = time.time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment