Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

Stress test websites

View gist:e825ce14f06dc198926e
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
#!/usr/bin/env python
"""
Stress test a web site
Usage:
stress_test_replay [options]
Options:
--host=<host> The host to connect to [default: http://www.example.com].
-n --num_workers=<num> The number of gevent workers to run [default: 2].
-q --quiet Disable mass delisionary screen output
--help Show this screen.
"""
from gevent.monkey import patch_all
 
# patch the standard library
patch_all()
 
import sys
import time
import os
import curses
 
import gevent
import grequests
from docopt import docopt
 
worker_responses = {}
 
# A key/value of the worker id and a True/False
# value if it has finished reading data
finished_workers = {}
 
# A key/value of the worker id and total time it took to
# receive and process each message
worker_times = {}
 
# Default number of gevent workers to run
num_workers = 10
 
 
def print_statuses(_window):
"""Print counts of clients receiving data via curses
:param _window: a curses window object
"""
# get the window size
_, screen_x = _window.getmaxyx()
 
while True:
finished = len(finished_workers.keys())
 
# break out if all workers have finished
if num_workers == finished:
break
 
# the first line of the window displays how many workers finished
_window.addstr(0, 0, 'Workers finished: {}'.format(finished))
max_value_size = '{{:>{}}}'.format(4)
 
# write out each worker id and the status codes
x, y = 0, 1
for k, v in sorted(worker_responses.iteritems()):
worker_number = str(k).zfill(len(str(num_workers)))
item_length = len(worker_number) + 12
if (x + item_length) > screen_x:
y += 1
x = 0
 
_window.addstr(y, x, '({}) -> {}'.format(worker_number, max_value_size.format(v)))
x += item_length
 
_window.refresh()
gevent.sleep(0.1)
 
 
def worker(worker_number, host):
"""Worker that connects to a host and checks response
:param worker_number: the id of the worker
:param host: the host to test
"""
start_time = time.time()
 
worker_responses[worker_number] = 0
request = grequests.get(host)
response = request.send()
worker_responses[worker_number] = response.status_code
finished_workers[worker_number] = True
 
elapse_time = time.time() - start_time
worker_times[worker_number] = elapse_time
 
 
def run(num_workers, host, use_curses):
"""Run X workers to simulate many clients requesting all at once
:param num_workers: the number of workers to run
:param use_curses: to use or not to use curses
"""
jobs = []
window = None
 
# create the jobs
for i in range(0, num_workers):
jobs.append(gevent.spawn(worker, i, host))
 
# append the screen curses screen print out
if use_curses:
window = curses.initscr()
jobs.append(gevent.spawn(print_statuses, window))
 
try:
start_time = time.time()
gevent.joinall(jobs)
except KeyboardInterrupt:
if use_curses:
curses.endwin()
 
if use_curses:
curses.endwin()
 
total_time = 0.00
for _, value in worker_times.iteritems():
total_time += value
print 'Workers ran in average of {} seconds'.format(total_time / num_workers)
print 'Total process time ran in {} seconds'.format(time.time() - start_time)
 
 
if __name__ == '__main__':
options = docopt(__doc__)
 
try:
num_workers = int(options['--num_workers'])
except ValueError:
pass
 
run(num_workers, options['--host'], not options['--quiet'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.