Skip to content
Create a gist now

Instantly share code, notes, and snippets.

Stress test websites
#!/usr/bin/env python
Stress test a web site
stress_test_replay [options]
--host=<host> The host to connect to [default:].
-n --num_workers=<num> The number of gevent workers to run [default: 2].
-q --quiet Disable mass delisionary screen output
--help Show this screen.
from gevent.monkey import patch_all
# patch the standard library
import sys
import time
import os
import curses
import gevent
import grequests
from docopt import docopt
worker_responses = {}
# A key/value of the worker id and a True/False
# value if it has finished reading data
finished_workers = {}
# A key/value of the worker id and total time it took to
# receive and process each message
worker_times = {}
# Default number of gevent workers to run
num_workers = 10
def print_statuses(_window):
"""Print counts of clients receiving data via curses
:param _window: a curses window object
# get the window size
_, screen_x = _window.getmaxyx()
while True:
finished = len(finished_workers.keys())
# break out if all workers have finished
if num_workers == finished:
# the first line of the window displays how many workers finished
_window.addstr(0, 0, 'Workers finished: {}'.format(finished))
max_value_size = '{{:>{}}}'.format(4)
# write out each worker id and the status codes
x, y = 0, 1
for k, v in sorted(worker_responses.iteritems()):
worker_number = str(k).zfill(len(str(num_workers)))
item_length = len(worker_number) + 12
if (x + item_length) > screen_x:
y += 1
x = 0
_window.addstr(y, x, '({}) -> {}'.format(worker_number, max_value_size.format(v)))
x += item_length
def worker(worker_number, host):
"""Worker that connects to a host and checks response
:param worker_number: the id of the worker
:param host: the host to test
start_time = time.time()
worker_responses[worker_number] = 0
request = grequests.get(host)
response = request.send()
worker_responses[worker_number] = response.status_code
finished_workers[worker_number] = True
elapse_time = time.time() - start_time
worker_times[worker_number] = elapse_time
def run(num_workers, host, use_curses):
"""Run X workers to simulate many clients requesting all at once
:param num_workers: the number of workers to run
:param use_curses: to use or not to use curses
jobs = []
window = None
# create the jobs
for i in range(0, num_workers):
jobs.append(gevent.spawn(worker, i, host))
# append the screen curses screen print out
if use_curses:
window = curses.initscr()
jobs.append(gevent.spawn(print_statuses, window))
start_time = time.time()
except KeyboardInterrupt:
if use_curses:
if use_curses:
total_time = 0.00
for _, value in worker_times.iteritems():
total_time += value
print 'Workers ran in average of {} seconds'.format(total_time / num_workers)
print 'Total process time ran in {} seconds'.format(time.time() - start_time)
if __name__ == '__main__':
options = docopt(__doc__)
num_workers = int(options['--num_workers'])
except ValueError:
run(num_workers, options['--host'], not options['--quiet'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.