Skip to content

Instantly share code, notes, and snippets.

@egh
Last active June 27, 2016 23:06
Show Gist options
  • Save egh/8c88738662f62ca8bc2acf0be6f9c12a to your computer and use it in GitHub Desktop.
Save egh/8c88738662f62ca8bc2acf0be6f9c12a to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
from datetime import datetime
from decimal import Decimal
import json
import re
import requests
import sys
import signal
import socket
import subprocess
import time
from urlparse import urlparse
from numpy import mean, median, std
try:
requests.packages.urllib3.disable_warnings()
except AttributeError:
pass
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--url', help='url to time', required=True)
parser.add_argument('-i', '--interface', help='network interface to packet dump', required=True)
parser.add_argument('-s', '--sudo', help='use sudo to run tcpdump', default=False, action='store_true')
parser.add_argument('-n', '--number', help='number of requests', required=True)
parser.add_argument('-H', '--host', help='override url host header', default=None)
args = parser.parse_args()
url = urlparse(args.url)
if args.host:
host = args.host
else:
host = url.hostname
timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H_%M_%SZ") # : not allowed in smbshare file name
destip=socket.gethostbyname(url.hostname)
srcip=socket.gethostbyname(socket.gethostname()) # not really accurate but ...
base_path = "ITP-798-test_prot=%s,destip=%s,srcip=%s,host=%s,lastpath=%s,time=%s"%(url.scheme, destip, srcip, host, url.path.split('/')[-1], timestamp)
tcpdump_path = "%s.pcp"%(base_path)
port = url.port
if port is None:
if url.scheme == 'http': port = 80
elif url.scheme == 'https': port = 443
mtr = subprocess.check_output(["mtr", "-Cb", url.hostname, "-T", "-P", str(port)])
mtr_data = [ line.split(';')[5] for line in mtr.split("\n") if line != "" ]
tcpdump = start_tcpdump(args.sudo, url.hostname, tcpdump_path, args.interface)
requests = list(generate_times(args.url, int(args.number), host=args.host))
times = [ req['elapsed_time'] for req in requests ]
stop_tcpdump(args.sudo, tcpdump, tcpdump_path)
with open("%s.log.json"%(base_path), 'w') as f:
results = {
'url': args.url,
'srcip': srcip,
'destip': destip,
'mtr': mtr_data,
'stats': {
'min': min(times),
'max': max(times),
'mean': mean(times),
'median': median(times),
'stdev': std(times),
'n': len(times)
},
'requests': requests
}
f.write(json.dumps(results))
def start_tcpdump(use_sudo, host, tcpdump_path, interface):
# prompt for sudo password - hard/impossible to get from popen
if use_sudo:
subprocess.call(["sudo", "/bin/true"])
tcpdump_args = ["tcpdump", "host", host, "-w", tcpdump_path, "-s0", "-i", interface]
if use_sudo:
tcpdump_args = ["sudo"] + tcpdump_args
return subprocess.Popen(tcpdump_args)
def stop_tcpdump(use_sudo, process, tcpdump_path):
if use_sudo:
# the process we want to kill is the CHILD of the sudo process
# linux only
child_pid = subprocess.check_output(["ps", "-o", "pid", "--ppid", str(process.pid), "--noheaders"]).strip()
subprocess.call(["sudo", "kill", "-TERM", child_pid])
else:
process.send_signal(signal.SIGTERM)
# wait for tcpdump to finish
time.sleep(1)
subprocess.call(["gzip", tcpdump_path])
def generate_times(url, limit, host=None):
counter = 0
while counter < limit:
if host:
headers = {'Host': host, 'Connection': 'close'}
else:
headers = {'Connection': 'close'}
try:
start = datetime.now()
request = requests.get(url, verify=False, headers=headers)
end = datetime.now()
request.raise_for_status()
delta = end - start
yield { 'status': request.status_code, 'elapsed_time': delta.total_seconds(), 'now': str(datetime.now()) }
except Exception as ex:
print('{}'.format(ex))
pass
counter += 1
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment