Skip to content

Instantly share code, notes, and snippets.

@abrostrom
Last active April 9, 2017 19:10
Show Gist options
  • Save abrostrom/74859c3e863c3121ba96 to your computer and use it in GitHub Desktop.
Save abrostrom/74859c3e863c3121ba96 to your computer and use it in GitHub Desktop.
A script developed to wrap arachni vulnerability scanner. Used to integrate security in the Agile development process

CI scanner

A script that wraps the Arachni vulnerability scanner. To be used in the CITEST environment

The script is optimized to run with Python 2.6 and Arachni v2.0.

$ python scan.py -h
Usage: scan.py [options] path/to/endpoints.json

Scan a host for vulnerabilities. (Must be placed in arachni directory if "-a"
is not supplied)

Options:
  -h, --help            show this help message and exit
  -d, --debug           turns on debug output
  -v, --verbose         turns on verbose output
  -q, --scan_quiet      scanner will only output vulnerabilities
  -n, --no_exit_code    returns 0 even if issues were found
  -b, --scan_debug      turns on debug output for scanner
  -w, --scan_verbose    turns on verbose output for scanner
  -a ARACHNI_DIR, --arachni_dir_location=ARACHNI_DIR
                        point to the arachni directory location

To set up your project for scanning you need to add a file called endpoints.json. This file should be located in [project_root]/test/security/ci/endpoints.json. It should contain an object with one key (endpoints) and a value that is a list of endpoints in the CITEST environment.

Example:

{
    "endpoints": [
        "http://webapp-now.ci.nordnet.se/webapp-now/resources/widget/"
    ]
}

Arachni will use the supplied profile profiles/ci_scanner.profile.afp when scanning. Tweak this file to change the behaviour of the scan. For a list of options refer to the documentation of the CLI.

A hard timeout for each scan is set to five minutes, longer scans will be aborted and cleaned up.

Note: This section is not applicable if the script was called with options -n or --no_exit_code!

The script scan.py will exit with different codes based on the results of the scan:

Code Explaination
0 Scan was successfull, and no issues were found.
1 Scan was successfull, and at least one issue was found.
2 Something went wrong while executing the script. See output for details.

The script will also output a report in JUnit format with any issues found. This report can be found in the root of the project.

---
datastore:
report_path:
audit:
parameter_values: true
exclude_vector_patterns: []
include_vector_patterns: []
link_templates: []
links: true
forms: true
session: {}
input:
values: {}
default_values:
"(?i-mx:name)": ci_scanner_name
"(?i-mx:user)": ci_scanner_user
"(?i-mx:usr)": ci_scanner_user
"(?i-mx:pass)": ci_scanner_password
"(?i-mx:txt)": ci_scanner_text
"(?i-mx:num)": '132'
"(?i-mx:amount)": '100'
"(?i-mx:mail)": ci_scanner@ci_scanner-notarealemailaddress.com
"(?i-mx:account)": '12'
"(?i-mx:id)": '1'
without_defaults: false
force: false
http:
user_agent: ci_scanner
request_timeout: 10000
request_redirect_limit: 5
request_concurrency: 20
request_queue_size: 500
response_max_size: 500000
request_headers: {}
cookies: {}
browser_cluster:
pool_size: 6
job_timeout: 15
worker_time_to_live: 100
ignore_images: false
screen_width: 1600
screen_height: 1200
scope:
redundant_path_patterns: {}
dom_depth_limit: 5
exclude_path_patterns: []
exclude_content_patterns: []
include_path_patterns: []
restrict_paths: []
extend_paths: []
url_rewrites: {}
checks:
- sql_injection_differential
- sql_injection
- sql_injection_timing
- no_sql_injection
- no_sql_injection_differential
- xss_dom_script_context
- xss_path
- xss_event
- xss_dom
- xss_dom_inputs
- xss_script_context
- xss
- xss_tag
- file_inclusion
- path_traversal
- directory_listing
- rfi
- os_cmd_injection
- os_cmd_injection_timing
- http_put
- unvalidated_redirect
- unvalidated_redirect_dom
- response_splitting
- code_injection
- code_injection_timing
- code_injection_php_input_wrapper
- source_code_disclosure
- session_fixation
- csrf
platforms: []
plugins: {}
no_fingerprinting: false
authorized_by:
# coding=utf-8
import optparse
import urlparse
import time
import json
import sys
import os
from subprocess import call
import xml.etree.cElementTree as ET
_author_ = 'Andreas Broström'
DEBUG = False
VERBOSE = False
SDEBUG = False
SVERBOSE = False
SQUIET = False
NO_EXIT_CODE = False
SDEBUG_LEVEL = 3
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
ARACHNI_DIR = SCRIPT_DIR # Default's to scripts path
REPORT_DIR = './scan_reports'
REPORT_ORIG_DIR = REPORT_DIR + '/original'
REPORT_HOST_DIR = REPORT_DIR + '/html'
REPORT_TMP_DIR = REPORT_DIR + '/tmp'
EXTEND_PATHS_FILE = REPORT_TMP_DIR + '/extend_paths.txt'
JUNIT_REPORT_FPATH = './TEST-security-ci_scanner.xml'
ARACHNI_TIMEOUT = '00:05:00' # HH:MM:SS
PROFILE_NAME = 'ci_scanner.profile.afp'
PROFILE_PATH = SCRIPT_DIR + '/profiles/' + PROFILE_NAME
ARACHNI_PATH = ARACHNI_DIR + '/bin'
ARACHNI_FPATH = ARACHNI_PATH + '/arachni'
ARACHNI_RFPATH = ARACHNI_PATH + '/arachni_reporter'
# Exit codes (Used if not '-n' is passed)
ARACHNI_ERROR_CODE = 1
SCRIPT_ERROR_CODE = 2
WEBAPP_SEC_NA_CODE = 0 # If there is no "endpoints"-file we skip security testing
PAID_REFS = ['secunia']
NORDNET_REF = 'Look for the latest "Secure Coding Guideline" at https://z-net.pilen.nordnet.se/it-guidelines/'
def update_arachni_dir(arachni_dir):
global ARACHNI_DIR, ARACHNI_PATH, ARACHNI_FPATH, ARACHNI_RFPATH
ARACHNI_DIR = arachni_dir
ARACHNI_PATH = ARACHNI_DIR + '/bin'
ARACHNI_FPATH = ARACHNI_PATH + '/arachni'
ARACHNI_RFPATH = ARACHNI_PATH + '/arachni_reporter'
def parse_args():
global DEBUG, VERBOSE, SQUIET, SDEBUG, SVERBOSE, NO_EXIT_CODE
usage = "usage: %prog [options] path/to/endpoints.json"
parser = optparse.OptionParser(usage=usage,
description='Scan a host for vulnerabilities. (Must be placed in arachni directory' +
' if "-a" is not supplied)')
parser.add_option('-d', '--debug', action='store_true', default=False,
help='turns on debug output')
parser.add_option('-v', '--verbose', action='store_true', default=False,
help='turns on verbose output')
parser.add_option('-q', '--scan_quiet', action='store_true', default=False,
help='scanner will only output vulnerabilities')
parser.add_option('-n', '--no_exit_code', action='store_true', default=False,
help='returns 0 even if issues were found')
parser.add_option('-b', '--scan_debug', action='store_true', default=False,
help='turns on debug output for scanner')
parser.add_option('-w', '--scan_verbose', action='store_true', default=False,
help='turns on verbose output for scanner')
parser.add_option('-a', '--arachni_dir_location', type="string", dest='arachni_dir',
help='point to the arachni directory location')
(options, args) = parser.parse_args()
DEBUG = options.debug
VERBOSE = options.verbose
SQUIET = options.scan_quiet
SDEBUG = options.scan_debug
SVERBOSE = options.scan_verbose
NO_EXIT_CODE = options.no_exit_code
if options.arachni_dir:
print_debug('User supplied arachni directory "{0:s}"'.format(options.arachni_dir))
update_arachni_dir(options.arachni_dir)
print_debug(
'DEBUG: {0}, VERBOSE: {1}, SDEBUG: {2}, SVERBOSE: {3}, SQUIET: {4}, NO_EXIT_CODE: {5}'.format(DEBUG, VERBOSE,
SDEBUG, SVERBOSE,
SQUIET,
NO_EXIT_CODE))
return args
def init_check(args):
print_verbose('Checking user supplied argument(s)')
if len(args) > 1:
print_error('Too many arguments! See usage with \'-h\'')
if len(args) < 1:
print_error('Missing arguments! See usage with \'-h\'')
endpoints_path = args[0]
print_debug('Checking if endpoints.json exists')
if not check_file_exists(endpoints_path, required=False):
print_verbose('No security testing set up for project, exiting...')
sys.exit(WEBAPP_SEC_NA_CODE)
check_required_path_exists(ARACHNI_PATH)
check_file_exists(ARACHNI_FPATH)
check_file_exists(ARACHNI_RFPATH)
check_file_exists(PROFILE_PATH)
create_dir_if_not_exists(REPORT_DIR)
create_dir_if_not_exists(REPORT_ORIG_DIR)
create_dir_if_not_exists(REPORT_TMP_DIR)
return endpoints_path
def check_required_path_exists(directory_path):
if not os.path.exists(directory_path):
print_error('Missing required directory: {0:s}'.format(directory_path))
else:
print_debug('Required directory {0:s} exists.'.format(directory_path))
return True
def check_file_exists(file_path, required=True):
if os.path.exists(file_path) and os.path.isfile(file_path):
print_debug('File {0:s} exists.'.format(file_path))
return True
elif required:
print_error('Missing required file: {0:s}'.format(file_path))
else:
print_debug('File {0:s} does not exist.'.format(file_path))
return False
def create_dir_if_not_exists(directory_path):
if not os.path.exists(directory_path):
os.makedirs(directory_path)
print_debug('Directory {0:s} created'.format(directory_path))
else:
print_debug('Directory {0:s} already exists'.format(directory_path))
def print_scan(msg):
print '[SCANNER] {0:s}'.format(msg)
def print_debug(msg):
if DEBUG:
print '[DEBUG] {0:s}'.format(msg)
def print_verbose(msg):
if VERBOSE:
print '[VERBOSE] {0:s}'.format(msg)
def print_warn(msg):
print '[WARN] {0:s}'.format(msg)
def print_error(msg, err=SCRIPT_ERROR_CODE):
print '[!ERROR!] {0:s}'.format(msg)
sys.exit(err)
def call_arachni(cmd, argv):
print_debug('Calling "{0:s}" with the following arguments: {1:s}'.format(cmd, argv))
try:
args = [cmd]
for arg in argv:
args.append(arg)
return call(args)
except:
print_error(
'Unexpected error when running the command "{0:s}" with the following arguments: {1:s}'.format(cmd, argv))
def get_target_url_and_paths(endpoints_list):
print_verbose('Getting target url and paths from multiple endpoints')
endpoints_string = "\n".join(endpoints_list)
parsed_uri = urlparse.urlparse(endpoints_list[0])
target_url = '{uri.scheme}://{uri.netloc}'.format(uri=parsed_uri)
endpoints_string = endpoints_string.replace(target_url, '') # Remove host from endpoints
print_debug('Final endpoints string is:\n{0:s}'.format(endpoints_string))
print_verbose('Creating new file at {0:s} with endpoints string'.format(EXTEND_PATHS_FILE))
with open(EXTEND_PATHS_FILE, 'w') as extend_paths_file:
extend_paths_file.write(endpoints_string)
return target_url, EXTEND_PATHS_FILE
def call_arachni_scanner(endpoints_list, timestamp):
print_verbose('Calling arachni scanner')
extend_paths_file = ''
if len(endpoints_list) > 1:
target_url, extend_paths_file = get_target_url_and_paths(endpoints_list)
else:
target_url = '{0:s}'.format(endpoints_list[0])
argv = [target_url,
'--profile-load-filepath={0:s}'.format(PROFILE_PATH),
'--report-save-path={0:s}/{1:s}.afr'.format(REPORT_ORIG_DIR, timestamp),
'--timeout={0:s}'.format(ARACHNI_TIMEOUT)]
if SQUIET:
argv.append('--output-only-positives')
if SVERBOSE:
argv.append('--output-verbose')
if SDEBUG:
argv.append('--output-debug={0:d}'.format(SDEBUG_LEVEL))
if extend_paths_file:
argv.append('--scope-extend-paths={0:s}'.format(extend_paths_file))
return call_arachni(ARACHNI_FPATH, argv)
def call_arachni_reporter(timestamp):
print_verbose('Calling arachni reporter')
argv = ['{0:s}/{1:s}.afr'.format(REPORT_ORIG_DIR, timestamp),
'--report=json:outfile={0:s}/{1:s}.json'.format(REPORT_TMP_DIR, timestamp)]
return call_arachni(ARACHNI_RFPATH, argv)
def generate_timestamp():
print_verbose('Generating new timestamp')
now = time.localtime()
name = '%d-%02d-%02dT%02d:%02d:%02d' % (now.tm_year, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min, now.tm_sec)
print_debug('Generated timestamp: {0:s}'.format(name))
return name
def parse_json(json_file_path):
print_debug('Opening file {0:s}'.format(json_file_path))
with open(json_file_path, 'r') as json_file:
print_debug('Parsing file as JSON')
json_obj = json.load(json_file)
return json_obj
def get_field_from_json(field, json_obj):
print_debug('Getting field {0:s} from json'.format(field))
return json_obj[field]
def get_endpoints(endpoints_file_path):
endpoints_json = parse_json(endpoints_file_path)
print_verbose('Getting endpoints')
return endpoints_json['endpoints']
def get_exit_code_from_issues(issues, delta_time):
num_issues = len(issues)
print_debug('{0:d} issues reported'.format(num_issues))
print_debug('Scan time: {0:s}'.format(delta_time))
if delta_time >= ARACHNI_TIMEOUT:
print_warn('Scan timed out. Maximum scan time is set to {1:s} (HH:MM:SS)'.format(ARACHNI_TIMEOUT))
if NO_EXIT_CODE:
return 0
elif num_issues > 0:
return ARACHNI_ERROR_CODE
else:
return 0
def cleanup_tmp():
print_verbose('Cleaning up in {0:s}'.format(REPORT_TMP_DIR))
for file_name in os.listdir(REPORT_TMP_DIR):
file_path = os.path.join(REPORT_TMP_DIR, file_name)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception, e:
print e
def get_issues_by_severity(issues):
print_verbose('Getting issues by severity')
issues_by_severity = {
'high': [],
'medium': [],
'low': [],
'informational': []
}
for issue in issues:
severity = issue['severity']
issues_by_severity[severity].append(issue)
return issues_by_severity
def get_heading(issue):
header = '{0:s} - {1:s}'.format(issue['name'], issue['check']['name'])
return '\n{0:s}\n{1:s}'.format(header, '-' * len(header))
def get_section_header(header):
header = '# {0:s} #'.format(header)
header_divider = '#' * len(header)
return '{0:s}\n{1:s}\n{0:s}'.format(header_divider, header)
def get_variations_for_issue(issue):
print_debug('Getting variations for issue')
variations = issue['variations']
var_text = ''
for idx, var in enumerate(variations):
trusted = 'Trusted' if issue['trusted'] else 'Untrusted'
var_text += '\n\nVariation {0:d} ({1:s})\n'.format(idx + 1, trusted)
if 'vector' in var and 'affected_input_value' in var['vector']:
var_text += '* Injected: {0:s}\n'.format(var['vector']['affected_input_value'])
if 'signature' in var:
var_text += '* Signature: {0:s}\n'.format(var['signature'])
if 'proof' in var:
var_text += '* Proof: {0:s}\n'.format(var['proof'])
var_text += '\n* Affected page: {0:s}\n'.format(var['page']['dom']['url'])
if 'request' in var and 'headers_string' in var['request'] and var['request']['headers_string'] != '':
var_text += '\n* HTTP request: \n{0:s}\n'.format(var['request']['headers_string'])
return var_text
def get_description_for_issue(issue):
print_debug('Getting description for issue')
# Description header and usmmary of severity, trust and the affected url.
description = '{0:s}\n\n'.format(get_heading(issue))
description += '* Severity: {0:s}\n'.format(issue['severity'].upper())
description += '* URL: {0:s}\n'.format(issue['vector']['url'])
if 'vector' in issue:
if 'method' in issue['vector']:
description += '* Method: {0:s}\n'.format(issue['vector']['method'].upper())
if 'affected_input_name' in issue['vector']:
description += '* Parameter: {0:s}\n'.format(issue['vector']['affected_input_name'])
description += '\n'
# Variations of the issue with proof.
description += '{0:s}{1:s}'.format(get_section_header('Variations'), get_variations_for_issue(issue))
# Description if it exists
if 'description' in issue:
description += '{0:s}\n{1:s}\n'.format(get_section_header('Description'), issue['description'])
# References if they exist
ref_text = '\n'
if 'references' in issue:
references = issue['references']
for key in references:
if key.lower() not in PAID_REFS: # Don't add references to paid services
ref_text += '{0:s}: {1:s}\n'.format(key, references[key])
# Include reference to Nordnet
ref_text += '{0:s}: {1:s}\n'.format('Nordnet', NORDNET_REF)
description += '{0:s}\n{1:s}\n'.format(get_section_header('References'), ref_text)
# Remediation guidance if it exists
if 'remedy_guidance' in issue:
description += '{0:s}\n{1:s}\n'.format(get_section_header('Remediation Guidance'), issue['remedy_guidance'])
return description
def get_issue_time(issue):
print_debug('Getting execution time for issue')
variations = issue['variations']
time = 0
for var in variations:
if 'response' in var and 'time' in var['response']:
time += var['response']['time']
return time
def generate_junit_report(issues, timestamp, delta_time):
print_verbose('Generating junit report from issues')
issues_by_severity = get_issues_by_severity(issues)
print_debug('Creating testsuites node')
testsuites = ET.Element('testsuites')
for severity in issues_by_severity.keys():
issue_list = issues_by_severity[severity]
num_issues = len(issue_list)
print_verbose('Issues - {0:s}: {1:d}'.format(severity, num_issues))
attributes_ts = {
'name': 'security.severity.{0:s}'.format(severity),
'errors': '0',
'skipped': '0',
'tests': '{0:d}'.format(num_issues + 1), # This ´1´ is tied to the default testcase below.
'failures': '{0:d}'.format(num_issues),
'time': delta_time,
'timestamp': timestamp
}
testsuite = ET.SubElement(testsuites, 'testsuite', attributes_ts)
# Add default testcase in each suite to avoid errors in the report
ET.SubElement(testsuite, 'testcase', {'classname': 'GLOBAL', 'name': 'Ran all tests in testsuite', 'time': '0'})
for issue in issue_list:
attributes_tc = {
'classname': issue['check']['shortname'].replace(' ', '_').upper(),
'name': issue['vector']['url'],
'time': '{0:f}'.format(get_issue_time(issue))
}
testcase = ET.SubElement(testsuite, 'testcase', attributes_tc)
check = issue['check']
failure = ET.SubElement(testcase, 'failure', {'message': check['name']})
failure.text = get_description_for_issue(issue)
tree = ET.ElementTree(testsuites)
tree.write(JUNIT_REPORT_FPATH, encoding='utf-8')
if __name__ == "__main__":
print_debug('Parsing args')
args = parse_args()
endpoints_file_path = init_check(args)
endpoints_list = get_endpoints(endpoints_file_path)
timestamp = generate_timestamp()
print_scan('Launching scanner')
if call_arachni_scanner(endpoints_list, timestamp):
print_error('That did not work.... Please see the output above')
print_scan('Scan complete. Generating report')
if call_arachni_reporter(timestamp):
print_error('That did not work.... Please see the output above')
print_scan('Report generated. Getting issues and time delta')
report_json = parse_json('{0:s}/{1:s}.json'.format(REPORT_TMP_DIR, timestamp))
issues = get_field_from_json('issues', report_json)
delta_time = get_field_from_json('delta_time', report_json)
generate_junit_report(issues, timestamp, delta_time)
exit_code = get_exit_code_from_issues(issues, delta_time)
cleanup_tmp()
print_verbose('Exiting ({0:d})'.format(exit_code))
sys.exit(exit_code)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment