Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python generator rate limiter using token bucket
from time import time, sleep
_128k = 128 * 1024
_256k = 256 * 1024
_512k = 512 * 1024
_1024k = 1024 * 1024
class TokenBucket(object):
"""An implementation of the token bucket algorithm.
>>> bucket = TokenBucket(80, 0.5)
>>> print bucket.consume(10)
True
adapted from http://code.activestate.com/recipes/511490-implementation-of-the-token-bucket-algorithm/?in=lang-python
Not thread safe.
"""
__slots__ = ['capacity', '_tokens', 'fill_rate', 'timestamp']
def __init__(self, tokens, fill_rate):
"""tokens is the total tokens in the bucket. fill_rate is the
rate in tokens/second that the bucket will be refilled."""
self.capacity = float(tokens)
self._tokens = float(tokens)
self.fill_rate = float(fill_rate)
self.timestamp = time()
def consume(self, tokens, block=True):
"""Consume tokens from the bucket. Returns True if there were
sufficient tokens.
If there are not enough tokens and block is True, sleeps until the
bucket is replenished enough to satisfy the deficiency.
If there are not enough tokens and block is False, returns False.
It is an error to consume more tokens than the bucket capacity.
"""
assert tokens <= self.capacity, \
'Attempted to consume {} tokens from a bucket with capacity {}' \
.format(tokens, self.capacity)
if block and tokens > self.tokens:
deficit = tokens - self._tokens
delay = deficit / self.fill_rate
# print 'Have {} tokens, need {}; sleeping {} seconds'.format(self._tokens, tokens, delay)
sleep(delay)
if tokens <= self.tokens:
self._tokens -= tokens
return True
else:
return False
@property
def tokens(self):
if self._tokens < self.capacity:
now = time()
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
class InfiniteTokenBucket(object):
"""TokenBucket implementation with infinite capacity, i.e. consume always
returns True."""
__slots__ = ()
def __init__(self, tokens=None, fill_rate=None):
pass
def consume(self, tokens, block=True):
return True
@property
def tokens(self):
return float('infinity')
def rate_limit(data, bandwidth_or_burst, steady_state_bandwidth=None):
"""Limit the bandwidth of a generator.
Given a data generator, return a generator that yields the data at no
higher than the specified bandwidth. For example, ``rate_limit(data, _256k)``
will yield from data at no higher than 256KB/s.
The three argument form distinguishes burst from steady-state bandwidth,
so ``rate_limit(data, _1024k, _128k)`` would allow data to be consumed at
128KB/s with an initial burst of 1MB.
"""
bandwidth = steady_state_bandwidth or bandwidth_or_burst
rate_limiter = TokenBucket(bandwidth_or_burst, bandwidth)
for thing in data:
rate_limiter.consume(len(str(thing)))
yield thing
@hanleilei
Copy link

hanleilei commented Dec 24, 2018

awful format..

@ruanima
Copy link

ruanima commented May 30, 2021

reformat

# -*- coding:utf-8 -*-

from time import time, sleep


_128k = 128 * 1024
_256k = 256 * 1024
_512k = 512 * 1024
_1024k = 1024 * 1024


class TokenBucket(object):
    """An implementation of the token bucket algorithm.
    >>> bucket = TokenBucket(80, 0.5)
    >>> print bucket.consume(10)
    True
    adapted from http://code.activestate.com/recipes/511490-implementation-of-the-token-bucket-algorithm/?in=lang-python
    Not thread safe.
    """

    __slots__ = ['capacity', '_tokens', 'fill_rate', 'timestamp']

    def __init__(self, tokens, fill_rate):
        """tokens is the total tokens in the bucket. fill_rate is the
        rate in tokens/second that the bucket will be refilled."""
        self.capacity = float(tokens)   # 容量
        self._tokens = float(tokens)
        self.fill_rate = float(fill_rate)    # 产生速度
        self.timestamp = time()

    def consume(self, tokens, block=True):
        """Consume tokens from the bucket. Returns True if there were
        sufficient tokens.
        If there are not enough tokens and block is True, sleeps until the
        bucket is replenished enough to satisfy the deficiency.
        If there are not enough tokens and block is False, returns False.
        It is an error to consume more tokens than the bucket capacity.
        """

        assert tokens <= self.capacity, \
            'Attempted to consume {} tokens from a bucket with capacity {}' \
                .format(tokens, self.capacity)

        if block and tokens > self.tokens:
            deficit = tokens - self._tokens
            delay = deficit / self.fill_rate

            print('Have {} tokens, need {}; sleeping {} seconds'.format(self._tokens, tokens, delay))
            sleep(delay)

        if tokens <= self.tokens:
            self._tokens -= tokens
            return True
        else:
            return False

    @property
    def tokens(self):
        if self._tokens < self.capacity:
            now = time()   # 获取当前时间
            delta = self.fill_rate * (now - self.timestamp)   # 算出这段时间产出的令牌
            self._tokens = min(self.capacity, self._tokens + delta)   # 丢弃超出容量的令牌
            self.timestamp = now   # 更新基准时间
        return self._tokens


class InfiniteTokenBucket(object):
    """TokenBucket implementation with infinite capacity, i.e. consume always
    returns True."""

    __slots__ = ()

    def __init__(self, tokens=None, fill_rate=None):
        pass

    def consume(self, tokens, block=True):
        return True

    @property
    def tokens(self):
        return float('infinity')


def rate_limit(data, bandwidth_or_burst, steady_state_bandwidth=None):
    """Limit the bandwidth of a generator.
    Given a data generator, return a generator that yields the data at no
    higher than the specified bandwidth.  For example, ``rate_limit(data, _256k)``
    will yield from data at no higher than 256KB/s.
    The three argument form distinguishes burst from steady-state bandwidth,
    so ``rate_limit(data, _1024k, _128k)`` would allow data to be consumed at
    128KB/s with an initial burst of 1MB.
    """

    bandwidth = steady_state_bandwidth or bandwidth_or_burst
    rate_limiter = TokenBucket(bandwidth_or_burst, bandwidth)

    for thing in data:
        rate_limiter.consume(len(str(thing)))
        yield thing


if __name__ == '__main__':
    stream = rate_limit(range(10), 5, 1)
    for i in stream:
        print(time(), i)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment