Skip to content

Instantly share code, notes, and snippets.

@glenbot
Last active August 29, 2015 14:01
Show Gist options
  • Save glenbot/e825ce14f06dc198926e to your computer and use it in GitHub Desktop.
Save glenbot/e825ce14f06dc198926e to your computer and use it in GitHub Desktop.
Stress test websites
#!/usr/bin/env python
"""
Stress test a web site
Usage:
stress_test_replay [options]
Options:
--host=<host> The host to connect to [default: http://www.example.com].
-n --num_workers=<num> The number of gevent workers to run [default: 2].
-q --quiet Disable mass delisionary screen output
--help Show this screen.
"""
from gevent.monkey import patch_all
# patch the standard library
patch_all()
import sys
import time
import os
import curses
import gevent
import grequests
from docopt import docopt
worker_responses = {}
# A key/value of the worker id and a True/False
# value if it has finished reading data
finished_workers = {}
# A key/value of the worker id and total time it took to
# receive and process each message
worker_times = {}
# Default number of gevent workers to run
num_workers = 10
def print_statuses(_window):
"""Print counts of clients receiving data via curses
:param _window: a curses window object
"""
# get the window size
_, screen_x = _window.getmaxyx()
while True:
finished = len(finished_workers.keys())
# break out if all workers have finished
if num_workers == finished:
break
# the first line of the window displays how many workers finished
_window.addstr(0, 0, 'Workers finished: {}'.format(finished))
max_value_size = '{{:>{}}}'.format(4)
# write out each worker id and the status codes
x, y = 0, 1
for k, v in sorted(worker_responses.iteritems()):
worker_number = str(k).zfill(len(str(num_workers)))
item_length = len(worker_number) + 12
if (x + item_length) > screen_x:
y += 1
x = 0
_window.addstr(y, x, '({}) -> {}'.format(worker_number, max_value_size.format(v)))
x += item_length
_window.refresh()
gevent.sleep(0.1)
def worker(worker_number, host):
"""Worker that connects to a host and checks response
:param worker_number: the id of the worker
:param host: the host to test
"""
start_time = time.time()
worker_responses[worker_number] = 0
request = grequests.get(host)
response = request.send()
worker_responses[worker_number] = response.status_code
finished_workers[worker_number] = True
elapse_time = time.time() - start_time
worker_times[worker_number] = elapse_time
def run(num_workers, host, use_curses):
"""Run X workers to simulate many clients requesting all at once
:param num_workers: the number of workers to run
:param use_curses: to use or not to use curses
"""
jobs = []
window = None
# create the jobs
for i in range(0, num_workers):
jobs.append(gevent.spawn(worker, i, host))
# append the screen curses screen print out
if use_curses:
window = curses.initscr()
jobs.append(gevent.spawn(print_statuses, window))
try:
start_time = time.time()
gevent.joinall(jobs)
except KeyboardInterrupt:
if use_curses:
curses.endwin()
if use_curses:
curses.endwin()
total_time = 0.00
for _, value in worker_times.iteritems():
total_time += value
print 'Workers ran in average of {} seconds'.format(total_time / num_workers)
print 'Total process time ran in {} seconds'.format(time.time() - start_time)
if __name__ == '__main__':
options = docopt(__doc__)
try:
num_workers = int(options['--num_workers'])
except ValueError:
pass
run(num_workers, options['--host'], not options['--quiet'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment