Skip to content

Instantly share code, notes, and snippets.

@mnot
Last active December 16, 2015 09:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mnot/5412567 to your computer and use it in GitHub Desktop.
Save mnot/5412567 to your computer and use it in GitHub Desktop.
Summarise how a list of HTTP servers handles a particular request to a set of URLs.
"""
htlook.py
Summarise how a list of HTTP servers handles a particular request to a
set of URLs.
You can modify the request by either:
* Adding headers with the -a flag; e.g.,
> ./htlook.py -a Foo: bar http://www.example.com/
will add
Foo: bar
to the request. Or,
* Specifying a request template file using the -t flag; e.g.,
> ./htlook.py -t test.req http://www.example.com/
where test.req contains:
---8<---
GET %(path)s HTTP/1.1
Host: %(authority)s
Connection: close
User-Agent: foo/1.0
--->8---
(note the trailing newlines!)
"""
import argparse
from collections import Counter
import sys
import thor
from thor.http import header_dict
from thor.http.common import HttpMessageHandler
from thor import on
from urlparse import urlsplit
#pylint: disable=W0311
timeout = 5
class HtLooker(object):
"""
Look at one or more URLs and find out how they handle a given request.
"""
def __init__(self, args):
self.args = args
self.outstanding = 0
if self.args.collect:
self.collect = set(self.args.collect)
else:
self.collect = set()
self.results = dict([(thing, []) for thing in self.collect])
self.show_progress = False
def run(self):
"Run the looker."
if self.args.urls == ['-']:
urls = sys.stdin.read().splitlines()
else:
urls = self.args.urls
if len(urls) > 9:
self.show_progress = True
self.progress("Loaded %i URLs." % len(urls))
if self.args.template:
try:
template = open(self.args.template).read()
except IOError, why:
self.bail(why)
for url in urls:
if self.args.template:
self.test_tcp(url, template)
else:
headers = [(k.strip(), v.strip()) for k, v in self.args.headers or []]
self.test_http(url, headers or [])
thor.run()
def bail(self, why):
"Oops."
sys.stderr.write("%s\n" % why)
sys.exit(1)
def progress(self, message):
if self.show_progress:
sys.stderr.write("%s\n" % message)
def task_start(self):
"Remember we started something."
self.outstanding += 1
def task_finished(self, result):
"Find out we finished it."
self.store_result(result)
self.outstanding -= 1
if self.outstanding % 10 == 0:
self.progress("%i URLs remaining..." % self.outstanding)
if self.outstanding == 0:
thor.stop()
self.show_results()
def store_result(self, result):
for thing in self.collect:
self.results[thing].append(result.get(thing, None))
if self.args.status_show and result['status'] in self.args.status_show:
print " status %s - %s" % (result['status'], result['url'])
if self.args.header_show:
headers = header_dict(result.get('headers', []))
for header in self.args.header_show:
print " header %s - %s" % (header, headers.get(header.lower(), '-'))
if self.args.error_show and result.has_key('err'):
print " error %s - %s" % (result['err'], result['url'])
def show_results(self):
print '-' * 80
if 'status' in self.collect:
status_hist = Counter(self.results['status'])
statuses = status_hist.keys()
statuses.sort()
for status in statuses:
print "%s - %i" % (status, status_hist[status])
def test_tcp(self, request_uri, req_template):
"""
Test request_uri using req_template.
"""
self.task_start()
url = dict(
zip(["scheme", "authority", "path", "query", "fragment"],
urlsplit(request_uri)
))
msg = req_template % url
c = thor.TcpClient()
@on(c)
def connect(conn):
p = TestHttpHandler(self.task_finished, request_uri)
conn.on('data', p.handle_input)
conn.on('close', p.input_end)
conn.write(msg)
conn.pause(False)
@on(c)
def connect_error(err_type, err_id, err_str):
result = {
"url": request_uri,
"type": "tcp",
"body": "",
"err": err_str
}
self.task_finished(result)
if ":" in url['authority']:
test_host, test_port = url['authority'].split(":", 1)
else:
test_host, test_port = url['authority'], 80
c.connect(test_host, test_port, timeout)
def test_http(self, request_uri, req_headers):
"""
Test request_uri and req_headers.
"""
self.task_start()
c = thor.http.HttpClient()
c.connect_timeout = timeout
c.read_timeout = timeout
x = c.exchange()
result = {'url': request_uri, 'type': 'http', 'body': ''}
@on(x)
def response_start(status, phrase, headers):
result['status'] = status
result['phrase'] = phrase
result['headers'] = headers
@on(x)
def response_body(chunk):
result['body'] += chunk
@on(x)
def response_done(trailers):
result['trailers'] = trailers
self.task_finished(result)
@on(x)
def error(err):
result['err'] = err.desc
self.task_finished(result)
x.request_start("GET", request_uri, req_headers)
x.request_done([])
class TestHttpHandler(HttpMessageHandler):
"""Http parser for stuff we get back from test_tcp."""
def __init__(self, cb, request_uri):
HttpMessageHandler.__init__(self)
self.cb = cb
self.cb_called = False
self.result = {"url": request_uri, "type": "tcp", "body": ""}
def input_start(self, top_line, hdr_tuples, conn_tokens,
transfer_codes, content_length):
"""
Take the top set of headers from the input stream, parse them
and queue the request to be processed by the application.
Returns boolean allows_body to indicate whether the message allows a
body.
Can raise ValueError to indicate that there's a problem and parsing
cannot continue.
"""
version, code, phrase = top_line.split(None, 2)
self.result['status'] = code
self.result['phrase'] = phrase
self.result['headers'] = hdr_tuples
return True
def input_body(self, chunk):
"Process a body chunk from the wire."
self.result['body'] += chunk
def input_end(self, trailers=[]):
"""
Indicate that the response body is complete. Optionally can contain
trailers.
"""
self.result['trailers'] = trailers
if not self.cb_called:
self.cb_called = True
self.cb(self.result)
def input_error(self, err):
"Indicate an unrecoverable parsing problem with the input stream."
self.result['err'] = err.desc
if not self.cb_called:
self.cb_called = True
self.cb(self.result)
def set_max_fds(max_fds):
"Make sure we can use max_fds file descriptors."
import resource
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
if sys.platform == 'darwin':
from subprocess import check_output
osl = int(check_output(["sysctl", "-n", "kern.maxfilesperproc"]))
new_soft = min(max_fds, hard, osl)
else:
new_soft = min(max_fds, hard)
resource.setrlimit(resource.RLIMIT_NOFILE, (new_soft, -1))
def parse_options():
"Parse command-line options and return args."
parser = argparse.ArgumentParser(
description='Test how a server handles a HTTP request.'
)
parser.add_argument('urls', metavar='URL', type=str, nargs='+',
help='a URL to test; "-" to test STDIN')
parser.add_argument('-a', dest='headers', metavar='header',
action='append', nargs=2,
help='append a header to requests; ' \
'takes two arguments (name and value)')
parser.add_argument('-t', dest='template',
help='use template for a raw test (disables -a)')
parser.add_argument('--status', const='status',
dest='collect', action='append_const',
help='Collect the response status')
parser.add_argument('--status-show', action='store', nargs="+",
help="Show URLs with the provided status code(s)")
parser.add_argument('--header-show', action='store', nargs="+",
help="Show values of the indicated header(s)")
parser.add_argument('--error-show', action='store_true',
help="Show URLs that have errors")
return parser.parse_args()
if __name__ == "__main__":
set_max_fds(20000)
HtLooker(parse_options()).run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment