Created
September 28, 2014 15:42
-
-
Save dufferzafar/d6b4f8fb44ed80608a44 to your computer and use it in GitHub Desktop.
Rate Limit Read: http://pastie.org/3120175#2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This snippet is based on http://stackoverflow.com/questions/456649/throttling-with-urllib2 | |
import io | |
import time | |
import threading | |
class TokenBucket(object): | |
"""An implementation of the token bucket algorithm. | |
source: http://code.activestate.com/recipes/511490/ | |
>>> bucket = TokenBucket(80, 0.5) | |
>>> print(bucket.consume(10)) | |
True | |
>>> print(bucket.consume(90)) | |
False | |
""" | |
def __init__(self, tokens, fill_rate): | |
"""tokens is the total tokens in the bucket. fill_rate is the | |
rate in tokens/second that the bucket will be refilled.""" | |
self.capacity = float(tokens) | |
self._tokens = float(tokens) | |
self.fill_rate = float(fill_rate) | |
self.timestamp = time.time() | |
self.lock = threading.RLock() | |
def consume(self, tokens): | |
"""Consume tokens from the bucket. Returns 0 if there were | |
sufficient tokens, otherwise the expected time until enough | |
tokens become available.""" | |
self.lock.acquire() | |
tokens = max(tokens,self.tokens) | |
expected_time = (tokens - self.tokens) / self.fill_rate | |
if expected_time <= 0: | |
self._tokens -= tokens | |
self.lock.release() | |
return max(0,expected_time) | |
@property | |
def tokens(self): | |
self.lock.acquire() | |
if self._tokens < self.capacity: | |
now = time.time() | |
delta = self.fill_rate * (now - self.timestamp) | |
self._tokens = min(self.capacity, self._tokens + delta) | |
self.timestamp = now | |
value = self._tokens | |
self.lock.release() | |
return value | |
class RateLimit(object): | |
"""Rate limit a url fetch. | |
source: http://mail.python.org/pipermail/python-list/2008-January/472859.html | |
(but mostly rewritten) | |
""" | |
def __init__(self, bucket): | |
self.bucket = bucket | |
self.last_update = 0 | |
self.last_downloaded_kb = 0 | |
self.avg_rate = None | |
def __call__(self, block_count, block_size, total_size): | |
downloaded_kb = (block_count * block_size) / 1024. | |
just_downloaded = downloaded_kb - self.last_downloaded_kb | |
self.last_downloaded_kb = downloaded_kb | |
predicted_size = block_size/1024. | |
wait_time = self.bucket.consume(predicted_size) | |
while wait_time > 0: | |
time.sleep(wait_time) | |
wait_time = self.bucket.consume(predicted_size) | |
now = time.time() | |
delta = now - self.last_update | |
if self.last_update != 0: | |
if delta > 0: | |
rate = just_downloaded / delta | |
if self.avg_rate is not None: | |
rate = 0.9 * self.avg_rate + 0.1 * rate | |
self.avg_rate = rate | |
else: | |
rate = self.avg_rate or 0. | |
self.last_update = now | |
def read_limiting_rate(rfp, kbps, wfp=None): | |
assert isinstance(rfp, io.RawIOBase) and rfp.readable() | |
if wfp is not None: | |
assert isinstance(wfp, io.IOBase) and wfp.writable() | |
block_size = 4096 | |
block_count = 0 | |
bucket = TokenBucket(10 * kbps, kbps) | |
rate_limiter = RateLimit(bucket) | |
data = bytearray() | |
rate_limiter(block_count, block_size, -1) | |
while True: | |
block = rfp.read(block_size) | |
if not block or rfp.closed: | |
break | |
if wfp is None: | |
data.extend(block) | |
else: | |
wfp.write(block) | |
block_count += 1 | |
rate_limiter(block_count, len(block), -1) | |
if wfp is None: | |
return data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment