Created
November 27, 2016 21:11
-
-
Save JPvRiel/604cef22e32aaedd15e6d888391b3dd4 to your computer and use it in GitHub Desktop.
A web scraping example using python. It used ssllabs.com to test a list of web sites and scraped the results. This was only useful back in 2015 before they published an API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python script to test ssl via ssllabs service | |
# - input: space or newline delimited set of hostnames to test | |
# - process: sends request, polls till results are complete | |
# - output: saves html page | |
# - output: prints summary results per site in csv | |
# - output format: <fqdn/hostname>,<http status code>,<overall rating>,<certficate score %>,<protocol support score %>,<key exchange score %>,<cipher strength score %> | |
# python libs to use | |
import csv | |
from lxml import html | |
import re | |
import requests | |
import socket | |
import sys, os, time, datetime | |
#TODO major enhancement to make class/object to get results and use threading and timmers... | |
# constants | |
BASEURL = 'https://www.ssllabs.com/ssltest/analyze.html' | |
# global variables | |
domains = [] | |
connection_timeout = 30 | |
attempt_timeout = 900 | |
poll_interval = connection_timeout | |
attempts = 900 / poll_interval | |
recurse_depth = 0 | |
f_in_domains = None | |
f_out_results = None | |
f_out_results_csv = None | |
# pre-compile regex for matching domains | |
RE_GRAPH_SCORE_PX="width:(?P<width>\d+)px" | |
GRAPH_SCORE_PX_WIDTH=300 | |
graph_width_pattern = re.compile(RE_GRAPH_SCORE_PX) | |
## function to scrape and save results ## | |
def testDomainSSLWeb(d, ip=None): | |
# report action | |
host = d.strip() if ip is None else d.strip() + '(' + ip + ')' | |
sys.stdout.write(host) | |
#build URL | |
if ip: | |
request_URL = BASEURL + '?d=' + d + '&s=' + ip + '&hideResults=on&ignoreMismatch=on' | |
else: | |
request_URL = BASEURL + '?d=' + d + '&hideResults=on&ignoreMismatch=on' | |
result = [datetime.datetime.now().isoformat(), os.path.basename(f_in_domains.name), d, 'unknown', 'unknown', 'NA', 'NA', 'NA', 'NA', 'NA', '', ''] | |
# Function variables | |
page = None | |
tree = None | |
# Values to capture | |
report_ip = None | |
overall_rating = None | |
certificate = None | |
protocols = None | |
key_exchange = None | |
ciphers = None | |
# Main X-Path locations | |
MAIN_BRANCH = '/html/body/div[@id="page"]/div[@id="main"]' | |
RESULT_SUMMARY_BRANCH = MAIN_BRANCH + '//div[@class="sectionTitle"][text()="Summary"]/following-sibling::div[@class="sectionBody"][1]' | |
# keep polling for results until a = attempts | |
a = 0 | |
while a < attempts: | |
a = a + 1 | |
# print dot's to show progress | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
# send request | |
try: | |
page = requests.get(request_URL, timeout=connection_timeout) | |
except (requests.exceptions.Timeout, socket.timeout) as e: | |
print ('\nWARNING: Timeout occurred. Will try again...') | |
time.sleep(poll_interval) | |
continue #try again | |
except requests.exceptions.RequestException as e: | |
print ('\nERROR: request exception occurred. Will abort attempt!') | |
print (e) | |
result[4] = 'test error' | |
result[11] = ' [E] request exception occurred. Domain not tested.' | |
break | |
# check if we got a valid HTTP response | |
http_status = page.status_code | |
if (http_status != 200): | |
print ('\nWARNING: HTTP response status code indicates request failed. HTTP ' + page.status_code + ' returned. Will try again...') | |
time.sleep(poll_interval) | |
continue #try again | |
# extract page content as XML tree | |
tree = html.fromstring(page.text) | |
# skip and poll to until the "please wait warning" has cleared, else we would attempt to scrape an incomplete result | |
test_warning_wait = tree.xpath(MAIN_BRANCH + '/div[@id="warningBox" and contains(string(.), "Please wait")]') | |
if test_warning_wait: | |
time.sleep(poll_interval) | |
continue | |
# check for error returned in page content | |
test_errors = tree.xpath(MAIN_BRANCH + '//div[@class="submitError"]/text()') | |
if test_errors: | |
result[4] = 'test error' | |
result[11] = ' [E] ' + ' [E] '.join(test_errors).strip() | |
break | |
# check if mulitple ip addresses have been returned for a single domain | |
multitable_IPs = tree.xpath(MAIN_BRANCH + '/table[@id="multiTable"]//span[@class="ip"]') | |
if multitable_IPs: | |
print ("\nINFO: multiple IPs returned for the domain, testing per IP instead...") | |
#DEBUG Recurison | |
global recurse_depth | |
recurse_depth = recurse_depth + 1 | |
#print ("\nDEBUG: recurse depth = %i. Domain=%s. IP=%s" % ( recurse_depth, d, ip )) | |
if recurse_depth > 3: | |
print ('\nERROR: request is recursing too far (3 levels). Will abort attempt!') | |
return | |
for ip_span in multitable_IPs: | |
ip = ip_span.xpath('string(.)') | |
#print ("DEBUG: " + ip) | |
# Recursive function call | |
testDomainSSLWeb(d, ip) | |
#DEBUG Recursion | |
recurse_depth = recurse_depth - 1 | |
# simply return from the function early | |
# this skips writing results from this parent of a nested test call where test results will be reported by child tests instead | |
return | |
# check for warning box in main page branch which appears before test results | |
# - ignore warning box if it contains 'Please Wait' | |
# - capture/report warning box if it doens't contain 'Please Wait' | |
## XPath text search example: //*[text()[contains(.,'ABC')]] | |
test_warnings_main = tree.xpath(MAIN_BRANCH + '/div[@id="warningBox" and not(contains(string(.), "Please wait"))]') | |
test_warnings_cleaned = [] | |
for w in test_warnings_main: | |
w_clean = [w_str.strip().replace('\n', '').replace('\r', '').replace('\t', '') for w_str in w.xpath('./text() | ./a/text()')] | |
# in case there was a link, intergrate and simply just single quote the link text (no URL provided) | |
test_warnings_cleaned.append("'".join(filter(None, w_clean))) | |
if test_warnings_cleaned: | |
result[4] = 'test warning' | |
result[10] = ' [W] ' + ' [W] '.join(test_warnings_cleaned) | |
break | |
# attempt to extract the data we want as eventually sslabs will update the page with the results... | |
# use string x-path funciton to handle cases where formatting might sometimes inlcude additonal elements/nodes. | |
overall_rating = tree.xpath('string(' + RESULT_SUMMARY_BRANCH + '//div[@id="rating"]/div[2])') | |
# its changed - below is obsolete as there's no % text anymore | |
#certificate = tree.xpath(RESULT_SUMMARY_BRANCH + '//div[@id="chart"]/div[2]/div[1]/div[3]/text()') | |
#protocols = tree.xpath(RESULT_SUMMARY_BRANCH + '//div[@id="chart"]/div[2]/div[2]/div[3]/text()') | |
#key_exchange = tree.xpath(RESULT_SUMMARY_BRANCH + '//div[@id="chart"]/div[2]/div[3]/div[3]/text()') | |
#ciphers = tree.xpath(RESULT_SUMMARY_BRANCH + '//div[@id="chart"]/div[2]/div[4]/div[3]/text()')Hemspter #26 | |
# now we have to reverse engineer the length of the graph, e.g. style="width:300px" implies 100% | |
certificate = tree.xpath('string(' + RESULT_SUMMARY_BRANCH + '//div[@id="chart"]//div[@class="chartLabel"][text()="Certificate"]/following-sibling::div[1]/@style)') | |
protocols = tree.xpath('string(' + RESULT_SUMMARY_BRANCH + '//div[@id="chart"]//div[@class="chartLabel"][text()="Protocol Support"]/following-sibling::div[1]/@style)') | |
key_exchange = tree.xpath('string(' + RESULT_SUMMARY_BRANCH + '//div[@id="chart"]//div[@class="chartLabel"][text()="Key Exchange"]/following-sibling::div[1]/@style)') | |
ciphers = tree.xpath('string(' + RESULT_SUMMARY_BRANCH + '//div[@id="chart"]//div[@class="chartLabel"][text()="Cipher Strength"]/following-sibling::div[1]/@style)') | |
#check the data scraped so far and move along to next domain if we got all we wanted | |
if (overall_rating and certificate and protocols and key_exchange and ciphers): | |
result[4] = 'complete' | |
break | |
else: | |
time.sleep(poll_interval) | |
# check if errors occurred | |
if (result[4] == 'test error' or result[4] == 'test warning'): | |
print (" = ?") | |
print ("WARNING: Skipping to next domain due to test error or warning") | |
# check if it timed out before getting all the data wanted | |
if (a == attempts and result[4] == 'unknown'): | |
print (" = ?") | |
print ("\nWARNING: Results are incomplete after %i attempts." % a) | |
result[4] = 'incomplete' | |
# cleanup and save text results | |
if overall_rating: | |
result[5] = ''.join(overall_rating).strip() | |
print (" = " + result[5]) | |
sys.stdout.flush() | |
if certificate: | |
result[6] = int(graph_width_pattern.match(certificate).group('width'))*100/GRAPH_SCORE_PX_WIDTH | |
if protocols: | |
result[7] = int(graph_width_pattern.match(protocols).group('width'))*100/GRAPH_SCORE_PX_WIDTH | |
if key_exchange: | |
result[8] = int(graph_width_pattern.match(key_exchange).group('width'))*100/GRAPH_SCORE_PX_WIDTH | |
if ciphers: | |
result[9] = int(graph_width_pattern.match(ciphers).group('width'))*100/GRAPH_SCORE_PX_WIDTH | |
if tree is not None: | |
# attempt to include ip in result | |
report_ip = tree.xpath(MAIN_BRANCH + '/div[@class="reportTitle"]/span[@class="ip"]/text()') | |
if report_ip: | |
result[3] = report_ip[0].strip().replace('(','').replace(')','') | |
# add info about any warnings. Note, text format used by website is very messy, sometimes containing links, etc... | |
warnings = tree.xpath(RESULT_SUMMARY_BRANCH + '//div[@class="warningBox"]') | |
warnings_cleaned = [] | |
for w in warnings: | |
w_clean = [w_str.strip().replace('\n', '').replace('\r', '').replace('\t', '') for w_str in w.xpath('./text() | ./a/text() ')] | |
# in case there was a link, intergrate and simply just single quote the link text (no URL provided) | |
warnings_cleaned.append("'".join(filter(None,w_clean))) | |
if warnings_cleaned: | |
result[10] = result[10] + ' [W] ' + ' [W] '.join(warnings_cleaned) | |
# add info about any errors... | |
errors = tree.xpath(RESULT_SUMMARY_BRANCH + '//div[@class="errorBox"]') | |
errors_cleaned = [] | |
for e in errors: | |
e_clean = [e_str.strip().replace('\n', '').replace('\r', '').replace('\t', '') for e_str in e.xpath('./text() | ./a/text() ')] | |
errors_cleaned.append("'".join(filter(None, e_clean))) | |
if errors_cleaned: | |
result[11] = result[11] + ' [E] '.join(errors_cleaned) | |
# output to csv | |
f_out_results_csv.writerow(result) | |
# save html to file | |
if page is not None: | |
try: | |
f_out_html_name = result[2] + '-' + result[3] + '-sslabs.html' | |
f_out_html = open('./' + SAVE_SUBDIR + '/' + f_out_html_name, 'w') | |
f_out_html.write(page.text) | |
f_out_html.close() | |
except IOError: | |
print ('WARNING: problem encountered saving web page as "' + f_out_html.name + '"') | |
exit(1) | |
# main code to initiate reading input file, open output file, etc | |
if len(sys.argv) < 2: | |
print ("Usage: ssllabs_cert_test_batch.py [file(s) with domains to test]...") | |
exit(2) | |
print ('Using SSLLabs to test SSL quality of domains supplied') | |
# create dir for saving html copies of web page results | |
SAVE_SUBDIR = 'saved_html' | |
if not os.path.exists('./' + SAVE_SUBDIR): | |
try: | |
os.makedirs('./' + SAVE_SUBDIR) | |
except OSError: | |
print ('ERROR: could not use directory "' + SAVE_SUBDIR + '". Aborting!') | |
exit(1) | |
print ('\n--- General') | |
print ('* html output dir: ' + SAVE_SUBDIR) # save html returned | |
print ('* connection timeout: %is' % connection_timeout) | |
print ('* might take a long while... poll for results every %is until is %is (%i polls) is reached before giving up' % ( poll_interval, attempt_timeout, attempts )) | |
print ('---\n') | |
# read in domains to test | |
# note f_in_domains is defined as a global at the start | |
for f in sys.argv[1:]: | |
if os.path.isfile(f): | |
try: | |
f_in_domains = open(f, 'r') | |
f_in_basename = os.path.basename(f_in_domains.name) | |
#lines = [line.rstrip() for line in f_in_domains] #avoid including newline characters.. but fails to handle malformed lines | |
RE_FQDN="(?=.{4,253})((?:(?!-)[a-zA-Z0-9-]{0,62}[a-zA-Z0-9]\.)+[a-zA-Z]{2,63})" | |
domains = re.findall(RE_FQDN, f_in_domains.read()) | |
f_in_domains.close() | |
print ('--- input file: ' + f_in_basename) | |
# open file for writing csv results | |
try: | |
f_out_name = f_in_basename + '-results.csv' | |
f_out_results = open(f_out_name, 'w') | |
f_out_results_csv = csv.writer(f_out_results) | |
CSV_HEADER = ['Timestamp', 'Domain Source', 'FQDN', 'IP', 'Status', 'Rating (A to Z)', 'certificate (%)', 'Protocol (%)', 'Key Exchange (%)', 'Ciphers (%)', '[W]arning(s)', '[E]rror(s)'] | |
f_out_results_csv.writerow(CSV_HEADER) | |
# main loop to iterate through domains listed | |
for d in domains: | |
testDomainSSLWeb(d) | |
print ('--- output file: ' + f_out_results.name) | |
except IOError: | |
print ('--- ERROR: could not use output file "' + f_out_results.name + '". Aborting!') | |
break | |
except IOError: | |
print ('--- ERROR: could not use input file "' + f + '". Skipping.') | |
else: | |
print ('--- ERROR input file: "' + f + '" is not a valid file. Skipping.') | |
print ('\n') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment