Last active
May 24, 2017 07:21
-
-
Save tevino/d303295c9567c43ee9291864fafd8a0a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import sys | |
import time | |
import random | |
import logging | |
from collections import Counter, defaultdict | |
from redis import StrictRedis | |
import gevent | |
from gevent.pool import Pool | |
from gevent.monkey import patch_all | |
counter = Counter() | |
redis = StrictRedis() | |
class Limiter(object): | |
_lua_incr = """ | |
redis.call("incrby",KEYS[1],ARGV[1]) | |
if redis.call("ttl",KEYS[1]) < 0 then | |
redis.call("expire",KEYS[1],ARGV[2]) | |
end | |
""" | |
def __init__(self, limit, interval=1): | |
self._call_counter = Counter() | |
self._overall_call_counter = {} | |
self._limit = limit | |
self._interval = 1.0 | |
self._running = False | |
self._rand = random.Random(time.time() + 42) | |
self._tmp = defaultdict(int) | |
def start(self): | |
if not self._running: | |
self._running = True | |
gevent.spawn(self._loop) | |
else: | |
logging.warn("Already started") | |
def stop(self): | |
self._running = False | |
def _loop(self): | |
while self._running: | |
old_counter = self._swap_call_counter() | |
self._send_calls(old_counter) | |
self._fetch_calls(old_counter.keys()) | |
self._sample("prefix:resource:user") | |
gevent.sleep(self._interval) | |
def _swap_call_counter(self): | |
old_counter = self._call_counter | |
self._call_counter = Counter() | |
return old_counter | |
def _send_calls(self, call_counter): | |
for key, count in call_counter.iteritems(): | |
redis.eval(self._lua_incr, 1, key, count, 1) | |
def _fetch_calls(self, keys): | |
if not keys: | |
logging.warn("no key") | |
return | |
self._overall_call_counter = dict(zip(keys, map(int, redis.mget(*keys)))) | |
logging.info("overall: %s", self._overall_call_counter) | |
def is_over(self, key): | |
total = self._overall_call_counter.get(key, 0) | |
delta = total - self._limit | |
if delta > 0: # over the limit | |
delta_percent = float(delta) / total * 100 | |
return self._rand.randint(0, 100) < delta_percent | |
return 0 | |
def incr(self, key, count=1): | |
self._call_counter.update({key: count}) | |
def _sample(self, rid): | |
overall = 0 | |
if self._overall_call_counter.values(): | |
overall = self._overall_call_counter.values()[0] | |
total = counter["failed"] + counter["successful"] | |
success = counter["successful"] | |
sample.append(self._limit, | |
total - self._tmp["last_total"], | |
success - self._tmp["last_success"], | |
overall) | |
self._tmp.update(last_total=total, | |
last_success=success) | |
def make_client(limiter): | |
def send_request(_): | |
gevent.sleep(random.randint(1, 10) * 0.001) | |
rid = "prefix:resource:user" | |
is_over = limiter.is_over(rid) | |
if is_over: | |
counter.update(failed=1) | |
else: | |
counter.update(successful=1) | |
limiter.incr(rid) | |
return send_request | |
def spawn_requests(limiter, duration): | |
pool = Pool(100) | |
st = time.time() | |
logging.info("Sending requests") | |
times = range(10) | |
send_request = make_client(limiter) | |
while True: | |
list(pool.imap_unordered(send_request, times)) | |
logging.info("Sent: %d/%d", | |
counter["successful"], | |
counter["successful"] + counter["failed"]) | |
if time.time() - st > duration: | |
logging.info("%ds elapsed", duration) | |
break | |
logging.info("Waiting pool") | |
pool.join() | |
print_summary(duration) | |
def print_summary(duration): | |
logging.info("Mean:") | |
logging.info(" Total QPS: %d", | |
(counter["successful"] + | |
counter["failed"]) / duration) | |
logging.info(" Passed QPS: %d", | |
counter["successful"] / duration) | |
for k, v in counter.iteritems(): | |
logging.info("{:<20} {:>10}".format(k, v)) | |
class Sample(object): | |
""" | |
set terminal png | |
set terminal png size 1920,1080 | |
set xdata time | |
set timefmt "%s" | |
set format x "%S" | |
plot "sample.txt" index 0 using 1:2 title 'Limit' with linespoints, \ | |
"sample.txt" index 0 using 1:3 title 'Total' with linespoints, \ | |
"sample.txt" index 0 using 1:4 title 'Succeed' with linespoints, \ | |
"sample.txt" index 0 using 1:5 title 'Overall' with linespoints | |
""" | |
def __init__(self, filename): | |
self.filename = filename | |
self._lines = [] | |
def append(self, *args): | |
formatter = " ".join(("{}",) * len(args)) | |
sample = formatter.format(*args) | |
self._lines.append(b"{} {}\n".format(time.time(), sample)) | |
def close(self): | |
if not self._lines: | |
return | |
with open(self.filename, "w") as fl: | |
fl.writelines(self._lines) | |
sample = Sample("sample.txt") | |
def main(): | |
patch_all() | |
limit = None | |
duration = None | |
try: | |
limit, duration = int(sys.argv[1]), int(sys.argv[2]) | |
except (IndexError, ValueError): | |
print("Usage: {} <limit> <duration>".format(sys.argv[0])) | |
exit(1) | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s: %(message)s") | |
limiter = Limiter(limit) | |
limiter.start() | |
spawn_requests(limiter, duration) | |
sample.close() | |
if __name__ == '__main__': | |
main() | |
sample.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment