Skip to content

Instantly share code, notes, and snippets.

@tevino
Last active May 24, 2017 07:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tevino/d303295c9567c43ee9291864fafd8a0a to your computer and use it in GitHub Desktop.
Save tevino/d303295c9567c43ee9291864fafd8a0a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import time
import random
import logging
from collections import Counter, defaultdict
from redis import StrictRedis
import gevent
from gevent.pool import Pool
from gevent.monkey import patch_all
counter = Counter()
redis = StrictRedis()
class Limiter(object):
_lua_incr = """
redis.call("incrby",KEYS[1],ARGV[1])
if redis.call("ttl",KEYS[1]) < 0 then
redis.call("expire",KEYS[1],ARGV[2])
end
"""
def __init__(self, limit, interval=1):
self._call_counter = Counter()
self._overall_call_counter = {}
self._limit = limit
self._interval = 1.0
self._running = False
self._rand = random.Random(time.time() + 42)
self._tmp = defaultdict(int)
def start(self):
if not self._running:
self._running = True
gevent.spawn(self._loop)
else:
logging.warn("Already started")
def stop(self):
self._running = False
def _loop(self):
while self._running:
old_counter = self._swap_call_counter()
self._send_calls(old_counter)
self._fetch_calls(old_counter.keys())
self._sample("prefix:resource:user")
gevent.sleep(self._interval)
def _swap_call_counter(self):
old_counter = self._call_counter
self._call_counter = Counter()
return old_counter
def _send_calls(self, call_counter):
for key, count in call_counter.iteritems():
redis.eval(self._lua_incr, 1, key, count, 1)
def _fetch_calls(self, keys):
if not keys:
logging.warn("no key")
return
self._overall_call_counter = dict(zip(keys, map(int, redis.mget(*keys))))
logging.info("overall: %s", self._overall_call_counter)
def is_over(self, key):
total = self._overall_call_counter.get(key, 0)
delta = total - self._limit
if delta > 0: # over the limit
delta_percent = float(delta) / total * 100
return self._rand.randint(0, 100) < delta_percent
return 0
def incr(self, key, count=1):
self._call_counter.update({key: count})
def _sample(self, rid):
overall = 0
if self._overall_call_counter.values():
overall = self._overall_call_counter.values()[0]
total = counter["failed"] + counter["successful"]
success = counter["successful"]
sample.append(self._limit,
total - self._tmp["last_total"],
success - self._tmp["last_success"],
overall)
self._tmp.update(last_total=total,
last_success=success)
def make_client(limiter):
def send_request(_):
gevent.sleep(random.randint(1, 10) * 0.001)
rid = "prefix:resource:user"
is_over = limiter.is_over(rid)
if is_over:
counter.update(failed=1)
else:
counter.update(successful=1)
limiter.incr(rid)
return send_request
def spawn_requests(limiter, duration):
pool = Pool(100)
st = time.time()
logging.info("Sending requests")
times = range(10)
send_request = make_client(limiter)
while True:
list(pool.imap_unordered(send_request, times))
logging.info("Sent: %d/%d",
counter["successful"],
counter["successful"] + counter["failed"])
if time.time() - st > duration:
logging.info("%ds elapsed", duration)
break
logging.info("Waiting pool")
pool.join()
print_summary(duration)
def print_summary(duration):
logging.info("Mean:")
logging.info(" Total QPS: %d",
(counter["successful"] +
counter["failed"]) / duration)
logging.info(" Passed QPS: %d",
counter["successful"] / duration)
for k, v in counter.iteritems():
logging.info("{:<20} {:>10}".format(k, v))
class Sample(object):
"""
set terminal png
set terminal png size 1920,1080
set xdata time
set timefmt "%s"
set format x "%S"
plot "sample.txt" index 0 using 1:2 title 'Limit' with linespoints, \
"sample.txt" index 0 using 1:3 title 'Total' with linespoints, \
"sample.txt" index 0 using 1:4 title 'Succeed' with linespoints, \
"sample.txt" index 0 using 1:5 title 'Overall' with linespoints
"""
def __init__(self, filename):
self.filename = filename
self._lines = []
def append(self, *args):
formatter = " ".join(("{}",) * len(args))
sample = formatter.format(*args)
self._lines.append(b"{} {}\n".format(time.time(), sample))
def close(self):
if not self._lines:
return
with open(self.filename, "w") as fl:
fl.writelines(self._lines)
sample = Sample("sample.txt")
def main():
patch_all()
limit = None
duration = None
try:
limit, duration = int(sys.argv[1]), int(sys.argv[2])
except (IndexError, ValueError):
print("Usage: {} <limit> <duration>".format(sys.argv[0]))
exit(1)
logging.basicConfig(level=logging.INFO, format="%(asctime)s: %(message)s")
limiter = Limiter(limit)
limiter.start()
spawn_requests(limiter, duration)
sample.close()
if __name__ == '__main__':
main()
sample.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment