|
# coding=utf-8 |
|
import optparse |
|
import urlparse |
|
import time |
|
import json |
|
import sys |
|
import os |
|
from subprocess import call |
|
import xml.etree.cElementTree as ET |
|
|
|
_author_ = 'Andreas Broström' |
|
|
|
DEBUG = False |
|
VERBOSE = False |
|
SDEBUG = False |
|
SVERBOSE = False |
|
SQUIET = False |
|
NO_EXIT_CODE = False |
|
|
|
SDEBUG_LEVEL = 3 |
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) |
|
ARACHNI_DIR = SCRIPT_DIR # Default's to scripts path |
|
|
|
REPORT_DIR = './scan_reports' |
|
REPORT_ORIG_DIR = REPORT_DIR + '/original' |
|
REPORT_HOST_DIR = REPORT_DIR + '/html' |
|
REPORT_TMP_DIR = REPORT_DIR + '/tmp' |
|
|
|
EXTEND_PATHS_FILE = REPORT_TMP_DIR + '/extend_paths.txt' |
|
JUNIT_REPORT_FPATH = './TEST-security-ci_scanner.xml' |
|
|
|
ARACHNI_TIMEOUT = '00:05:00' # HH:MM:SS |
|
PROFILE_NAME = 'ci_scanner.profile.afp' |
|
PROFILE_PATH = SCRIPT_DIR + '/profiles/' + PROFILE_NAME |
|
|
|
ARACHNI_PATH = ARACHNI_DIR + '/bin' |
|
ARACHNI_FPATH = ARACHNI_PATH + '/arachni' |
|
ARACHNI_RFPATH = ARACHNI_PATH + '/arachni_reporter' |
|
|
|
# Exit codes (Used if not '-n' is passed) |
|
ARACHNI_ERROR_CODE = 1 |
|
SCRIPT_ERROR_CODE = 2 |
|
WEBAPP_SEC_NA_CODE = 0 # If there is no "endpoints"-file we skip security testing |
|
|
|
PAID_REFS = ['secunia'] |
|
NORDNET_REF = 'Look for the latest "Secure Coding Guideline" at https://z-net.pilen.nordnet.se/it-guidelines/' |
|
|
|
|
|
def update_arachni_dir(arachni_dir): |
|
global ARACHNI_DIR, ARACHNI_PATH, ARACHNI_FPATH, ARACHNI_RFPATH |
|
ARACHNI_DIR = arachni_dir |
|
|
|
ARACHNI_PATH = ARACHNI_DIR + '/bin' |
|
ARACHNI_FPATH = ARACHNI_PATH + '/arachni' |
|
ARACHNI_RFPATH = ARACHNI_PATH + '/arachni_reporter' |
|
|
|
|
|
def parse_args(): |
|
global DEBUG, VERBOSE, SQUIET, SDEBUG, SVERBOSE, NO_EXIT_CODE |
|
usage = "usage: %prog [options] path/to/endpoints.json" |
|
parser = optparse.OptionParser(usage=usage, |
|
description='Scan a host for vulnerabilities. (Must be placed in arachni directory' + |
|
' if "-a" is not supplied)') |
|
parser.add_option('-d', '--debug', action='store_true', default=False, |
|
help='turns on debug output') |
|
parser.add_option('-v', '--verbose', action='store_true', default=False, |
|
help='turns on verbose output') |
|
parser.add_option('-q', '--scan_quiet', action='store_true', default=False, |
|
help='scanner will only output vulnerabilities') |
|
parser.add_option('-n', '--no_exit_code', action='store_true', default=False, |
|
help='returns 0 even if issues were found') |
|
parser.add_option('-b', '--scan_debug', action='store_true', default=False, |
|
help='turns on debug output for scanner') |
|
parser.add_option('-w', '--scan_verbose', action='store_true', default=False, |
|
help='turns on verbose output for scanner') |
|
parser.add_option('-a', '--arachni_dir_location', type="string", dest='arachni_dir', |
|
help='point to the arachni directory location') |
|
|
|
(options, args) = parser.parse_args() |
|
|
|
DEBUG = options.debug |
|
VERBOSE = options.verbose |
|
SQUIET = options.scan_quiet |
|
SDEBUG = options.scan_debug |
|
SVERBOSE = options.scan_verbose |
|
NO_EXIT_CODE = options.no_exit_code |
|
|
|
if options.arachni_dir: |
|
print_debug('User supplied arachni directory "{0:s}"'.format(options.arachni_dir)) |
|
update_arachni_dir(options.arachni_dir) |
|
|
|
print_debug( |
|
'DEBUG: {0}, VERBOSE: {1}, SDEBUG: {2}, SVERBOSE: {3}, SQUIET: {4}, NO_EXIT_CODE: {5}'.format(DEBUG, VERBOSE, |
|
SDEBUG, SVERBOSE, |
|
SQUIET, |
|
NO_EXIT_CODE)) |
|
return args |
|
|
|
|
|
def init_check(args): |
|
print_verbose('Checking user supplied argument(s)') |
|
if len(args) > 1: |
|
print_error('Too many arguments! See usage with \'-h\'') |
|
if len(args) < 1: |
|
print_error('Missing arguments! See usage with \'-h\'') |
|
|
|
endpoints_path = args[0] |
|
print_debug('Checking if endpoints.json exists') |
|
if not check_file_exists(endpoints_path, required=False): |
|
print_verbose('No security testing set up for project, exiting...') |
|
sys.exit(WEBAPP_SEC_NA_CODE) |
|
|
|
check_required_path_exists(ARACHNI_PATH) |
|
|
|
check_file_exists(ARACHNI_FPATH) |
|
check_file_exists(ARACHNI_RFPATH) |
|
check_file_exists(PROFILE_PATH) |
|
|
|
create_dir_if_not_exists(REPORT_DIR) |
|
create_dir_if_not_exists(REPORT_ORIG_DIR) |
|
create_dir_if_not_exists(REPORT_TMP_DIR) |
|
|
|
return endpoints_path |
|
|
|
|
|
def check_required_path_exists(directory_path): |
|
if not os.path.exists(directory_path): |
|
print_error('Missing required directory: {0:s}'.format(directory_path)) |
|
else: |
|
print_debug('Required directory {0:s} exists.'.format(directory_path)) |
|
return True |
|
|
|
|
|
def check_file_exists(file_path, required=True): |
|
if os.path.exists(file_path) and os.path.isfile(file_path): |
|
print_debug('File {0:s} exists.'.format(file_path)) |
|
return True |
|
elif required: |
|
print_error('Missing required file: {0:s}'.format(file_path)) |
|
else: |
|
print_debug('File {0:s} does not exist.'.format(file_path)) |
|
return False |
|
|
|
|
|
def create_dir_if_not_exists(directory_path): |
|
if not os.path.exists(directory_path): |
|
os.makedirs(directory_path) |
|
print_debug('Directory {0:s} created'.format(directory_path)) |
|
else: |
|
print_debug('Directory {0:s} already exists'.format(directory_path)) |
|
|
|
|
|
def print_scan(msg): |
|
print '[SCANNER] {0:s}'.format(msg) |
|
|
|
|
|
def print_debug(msg): |
|
if DEBUG: |
|
print '[DEBUG] {0:s}'.format(msg) |
|
|
|
|
|
def print_verbose(msg): |
|
if VERBOSE: |
|
print '[VERBOSE] {0:s}'.format(msg) |
|
|
|
|
|
def print_warn(msg): |
|
print '[WARN] {0:s}'.format(msg) |
|
|
|
|
|
def print_error(msg, err=SCRIPT_ERROR_CODE): |
|
print '[!ERROR!] {0:s}'.format(msg) |
|
sys.exit(err) |
|
|
|
|
|
def call_arachni(cmd, argv): |
|
print_debug('Calling "{0:s}" with the following arguments: {1:s}'.format(cmd, argv)) |
|
try: |
|
args = [cmd] |
|
for arg in argv: |
|
args.append(arg) |
|
return call(args) |
|
except: |
|
print_error( |
|
'Unexpected error when running the command "{0:s}" with the following arguments: {1:s}'.format(cmd, argv)) |
|
|
|
|
|
def get_target_url_and_paths(endpoints_list): |
|
print_verbose('Getting target url and paths from multiple endpoints') |
|
endpoints_string = "\n".join(endpoints_list) |
|
|
|
parsed_uri = urlparse.urlparse(endpoints_list[0]) |
|
target_url = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_uri) |
|
endpoints_string = endpoints_string.replace(target_url, '') # Remove host from endpoints |
|
|
|
print_debug('Final endpoints string is:\n{0:s}'.format(endpoints_string)) |
|
print_verbose('Creating new file at {0:s} with endpoints string'.format(EXTEND_PATHS_FILE)) |
|
with open(EXTEND_PATHS_FILE, 'w') as extend_paths_file: |
|
extend_paths_file.write(endpoints_string) |
|
|
|
return target_url, EXTEND_PATHS_FILE |
|
|
|
|
|
def call_arachni_scanner(endpoints_list, timestamp): |
|
print_verbose('Calling arachni scanner') |
|
extend_paths_file = '' |
|
if len(endpoints_list) > 1: |
|
target_url, extend_paths_file = get_target_url_and_paths(endpoints_list) |
|
else: |
|
target_url = '{0:s}'.format(endpoints_list[0]) |
|
|
|
argv = [target_url, |
|
'--profile-load-filepath={0:s}'.format(PROFILE_PATH), |
|
'--report-save-path={0:s}/{1:s}.afr'.format(REPORT_ORIG_DIR, timestamp), |
|
'--timeout={0:s}'.format(ARACHNI_TIMEOUT)] |
|
|
|
if SQUIET: |
|
argv.append('--output-only-positives') |
|
|
|
if SVERBOSE: |
|
argv.append('--output-verbose') |
|
|
|
if SDEBUG: |
|
argv.append('--output-debug={0:d}'.format(SDEBUG_LEVEL)) |
|
if extend_paths_file: |
|
argv.append('--scope-extend-paths={0:s}'.format(extend_paths_file)) |
|
|
|
return call_arachni(ARACHNI_FPATH, argv) |
|
|
|
|
|
def call_arachni_reporter(timestamp): |
|
print_verbose('Calling arachni reporter') |
|
argv = ['{0:s}/{1:s}.afr'.format(REPORT_ORIG_DIR, timestamp), |
|
'--report=json:outfile={0:s}/{1:s}.json'.format(REPORT_TMP_DIR, timestamp)] |
|
return call_arachni(ARACHNI_RFPATH, argv) |
|
|
|
|
|
def generate_timestamp(): |
|
print_verbose('Generating new timestamp') |
|
now = time.localtime() |
|
name = '%d-%02d-%02dT%02d:%02d:%02d' % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec) |
|
print_debug('Generated timestamp: {0:s}'.format(name)) |
|
return name |
|
|
|
|
|
def parse_json(json_file_path): |
|
print_debug('Opening file {0:s}'.format(json_file_path)) |
|
with open(json_file_path, 'r') as json_file: |
|
print_debug('Parsing file as JSON') |
|
json_obj = json.load(json_file) |
|
|
|
return json_obj |
|
|
|
|
|
def get_field_from_json(field, json_obj): |
|
print_debug('Getting field {0:s} from json'.format(field)) |
|
return json_obj[field] |
|
|
|
|
|
def get_endpoints(endpoints_file_path): |
|
endpoints_json = parse_json(endpoints_file_path) |
|
print_verbose('Getting endpoints') |
|
return endpoints_json['endpoints'] |
|
|
|
|
|
def get_exit_code_from_issues(issues, delta_time): |
|
num_issues = len(issues) |
|
print_debug('{0:d} issues reported'.format(num_issues)) |
|
print_debug('Scan time: {0:s}'.format(delta_time)) |
|
|
|
if delta_time >= ARACHNI_TIMEOUT: |
|
print_warn('Scan timed out. Maximum scan time is set to {1:s} (HH:MM:SS)'.format(ARACHNI_TIMEOUT)) |
|
|
|
if NO_EXIT_CODE: |
|
return 0 |
|
elif num_issues > 0: |
|
return ARACHNI_ERROR_CODE |
|
else: |
|
return 0 |
|
|
|
|
|
def cleanup_tmp(): |
|
print_verbose('Cleaning up in {0:s}'.format(REPORT_TMP_DIR)) |
|
|
|
for file_name in os.listdir(REPORT_TMP_DIR): |
|
file_path = os.path.join(REPORT_TMP_DIR, file_name) |
|
try: |
|
if os.path.isfile(file_path): |
|
os.unlink(file_path) |
|
except Exception, e: |
|
print e |
|
|
|
|
|
def get_issues_by_severity(issues): |
|
print_verbose('Getting issues by severity') |
|
issues_by_severity = { |
|
'high': [], |
|
'medium': [], |
|
'low': [], |
|
'informational': [] |
|
} |
|
|
|
for issue in issues: |
|
severity = issue['severity'] |
|
issues_by_severity[severity].append(issue) |
|
|
|
return issues_by_severity |
|
|
|
|
|
def get_heading(issue): |
|
header = '{0:s} - {1:s}'.format(issue['name'], issue['check']['name']) |
|
return '\n{0:s}\n{1:s}'.format(header, '-' * len(header)) |
|
|
|
|
|
def get_section_header(header): |
|
header = '# {0:s} #'.format(header) |
|
header_divider = '#' * len(header) |
|
return '{0:s}\n{1:s}\n{0:s}'.format(header_divider, header) |
|
|
|
|
|
def get_variations_for_issue(issue): |
|
print_debug('Getting variations for issue') |
|
variations = issue['variations'] |
|
var_text = '' |
|
|
|
for idx, var in enumerate(variations): |
|
trusted = 'Trusted' if issue['trusted'] else 'Untrusted' |
|
var_text += '\n\nVariation {0:d} ({1:s})\n'.format(idx + 1, trusted) |
|
|
|
if 'vector' in var and 'affected_input_value' in var['vector']: |
|
var_text += '* Injected: {0:s}\n'.format(var['vector']['affected_input_value']) |
|
|
|
if 'signature' in var: |
|
var_text += '* Signature: {0:s}\n'.format(var['signature']) |
|
|
|
if 'proof' in var: |
|
var_text += '* Proof: {0:s}\n'.format(var['proof']) |
|
|
|
var_text += '\n* Affected page: {0:s}\n'.format(var['page']['dom']['url']) |
|
|
|
if 'request' in var and 'headers_string' in var['request'] and var['request']['headers_string'] != '': |
|
var_text += '\n* HTTP request: \n{0:s}\n'.format(var['request']['headers_string']) |
|
|
|
return var_text |
|
|
|
|
|
def get_description_for_issue(issue): |
|
print_debug('Getting description for issue') |
|
|
|
# Description header and usmmary of severity, trust and the affected url. |
|
description = '{0:s}\n\n'.format(get_heading(issue)) |
|
description += '* Severity: {0:s}\n'.format(issue['severity'].upper()) |
|
description += '* URL: {0:s}\n'.format(issue['vector']['url']) |
|
|
|
if 'vector' in issue: |
|
if 'method' in issue['vector']: |
|
description += '* Method: {0:s}\n'.format(issue['vector']['method'].upper()) |
|
if 'affected_input_name' in issue['vector']: |
|
description += '* Parameter: {0:s}\n'.format(issue['vector']['affected_input_name']) |
|
|
|
description += '\n' |
|
|
|
# Variations of the issue with proof. |
|
description += '{0:s}{1:s}'.format(get_section_header('Variations'), get_variations_for_issue(issue)) |
|
|
|
# Description if it exists |
|
if 'description' in issue: |
|
description += '{0:s}\n{1:s}\n'.format(get_section_header('Description'), issue['description']) |
|
|
|
# References if they exist |
|
ref_text = '\n' |
|
if 'references' in issue: |
|
references = issue['references'] |
|
for key in references: |
|
if key.lower() not in PAID_REFS: # Don't add references to paid services |
|
ref_text += '{0:s}: {1:s}\n'.format(key, references[key]) |
|
|
|
# Include reference to Nordnet |
|
ref_text += '{0:s}: {1:s}\n'.format('Nordnet', NORDNET_REF) |
|
description += '{0:s}\n{1:s}\n'.format(get_section_header('References'), ref_text) |
|
|
|
# Remediation guidance if it exists |
|
if 'remedy_guidance' in issue: |
|
description += '{0:s}\n{1:s}\n'.format(get_section_header('Remediation Guidance'), issue['remedy_guidance']) |
|
|
|
return description |
|
|
|
|
|
def get_issue_time(issue): |
|
print_debug('Getting execution time for issue') |
|
variations = issue['variations'] |
|
time = 0 |
|
for var in variations: |
|
if 'response' in var and 'time' in var['response']: |
|
time += var['response']['time'] |
|
|
|
return time |
|
|
|
|
|
def generate_junit_report(issues, timestamp, delta_time): |
|
print_verbose('Generating junit report from issues') |
|
issues_by_severity = get_issues_by_severity(issues) |
|
print_debug('Creating testsuites node') |
|
testsuites = ET.Element('testsuites') |
|
for severity in issues_by_severity.keys(): |
|
issue_list = issues_by_severity[severity] |
|
num_issues = len(issue_list) |
|
print_verbose('Issues - {0:s}: {1:d}'.format(severity, num_issues)) |
|
attributes_ts = { |
|
'name': 'security.severity.{0:s}'.format(severity), |
|
'errors': '0', |
|
'skipped': '0', |
|
'tests': '{0:d}'.format(num_issues + 1), # This ´1´ is tied to the default testcase below. |
|
'failures': '{0:d}'.format(num_issues), |
|
'time': delta_time, |
|
'timestamp': timestamp |
|
} |
|
testsuite = ET.SubElement(testsuites, 'testsuite', attributes_ts) |
|
|
|
# Add default testcase in each suite to avoid errors in the report |
|
ET.SubElement(testsuite, 'testcase', {'classname': 'GLOBAL', 'name': 'Ran all tests in testsuite', 'time': '0'}) |
|
|
|
for issue in issue_list: |
|
attributes_tc = { |
|
'classname': issue['check']['shortname'].replace(' ', '_').upper(), |
|
'name': issue['vector']['url'], |
|
'time': '{0:f}'.format(get_issue_time(issue)) |
|
} |
|
testcase = ET.SubElement(testsuite, 'testcase', attributes_tc) |
|
|
|
check = issue['check'] |
|
failure = ET.SubElement(testcase, 'failure', {'message': check['name']}) |
|
failure.text = get_description_for_issue(issue) |
|
|
|
tree = ET.ElementTree(testsuites) |
|
tree.write(JUNIT_REPORT_FPATH, encoding='utf-8') |
|
|
|
|
|
if __name__ == "__main__": |
|
print_debug('Parsing args') |
|
args = parse_args() |
|
|
|
endpoints_file_path = init_check(args) |
|
endpoints_list = get_endpoints(endpoints_file_path) |
|
timestamp = generate_timestamp() |
|
|
|
print_scan('Launching scanner') |
|
if call_arachni_scanner(endpoints_list, timestamp): |
|
print_error('That did not work.... Please see the output above') |
|
|
|
print_scan('Scan complete. Generating report') |
|
if call_arachni_reporter(timestamp): |
|
print_error('That did not work.... Please see the output above') |
|
|
|
print_scan('Report generated. Getting issues and time delta') |
|
report_json = parse_json('{0:s}/{1:s}.json'.format(REPORT_TMP_DIR, timestamp)) |
|
issues = get_field_from_json('issues', report_json) |
|
delta_time = get_field_from_json('delta_time', report_json) |
|
|
|
generate_junit_report(issues, timestamp, delta_time) |
|
|
|
exit_code = get_exit_code_from_issues(issues, delta_time) |
|
|
|
cleanup_tmp() |
|
|
|
print_verbose('Exiting ({0:d})'.format(exit_code)) |
|
sys.exit(exit_code) |