Skip to content

Instantly share code, notes, and snippets.

@perfecto25
Last active August 10, 2021 11:05
Show Gist options
  • Save perfecto25/dc77aaa951f473f33e59e9c99e2a53cf to your computer and use it in GitHub Desktop.
Save perfecto25/dc77aaa951f473f33e59e9c99e2a53cf to your computer and use it in GitHub Desktop.
Nessus API scripts
#!/usr/bin/env python
# coding=utf-8
# GET NESSUS CSV
# gets Nessus reports as CSV export
import sys
import os
import glob
from ast import literal_eval
import nessus
config = 'nessus_config.json'
def dictator(data, path=None, default=None, checknone=False):
'''
The Dictator function takes a dictionary or JSON data and returns value for a specific key.
If dictator doesnt find a value for a key, or the data is missing the key, the return value is
either None or whatever fallback value you provide as default="My Default Fallback value".
Dictator is polite with Exception errors commonly encountered when parsing large Dictionaries/JSONs
Usage:
get a value from a Dictionary key
> dictator(data, "employees[5].firstName")
with custom value on lookup failure,
> dictator(data, "employees[5].firstName", default="No employee found")
pass a parameter
> dictator(data, "company.name.{}".format(my_company))
lookup a 3rd element of List Json, on second key, lookup index=5
> dictator(data, "3.first.second[5]")
lookup a nested list of lists
> dictator(data, "0.first[1].2.second.third[0].2"
check if return value is None, if it is, raise a Value error
> dictator(data, "some.key.value", checknone=True)
> error Value Error
'''
import json
if path is None or path == '':
return json.dumps(data)
value = None
keys = path.split(".")
# reset path
path = None
# if 1st key is a list index
if all(char.isdigit() for char in keys[0]):
path = '['+keys[0]+']'
# remove 1st key from key list
keys.pop(0)
# build proper path
for key in keys:
# check if key is a list
if key.endswith(']'):
temp = key.split('[')
key = ''.join(temp[0])
index = int(temp[1].strip(']'))
if path is None:
path = "['"+key+"']"+"["+str(index)+"]"
else:
path = path+"['"+key+"']"+"["+str(index)+"]"
else:
if path is None:
path = "['"+key+"']"
else:
# check if key is an index
if key.isdigit() is True:
path = path + "["+key+"]"
else:
path = path + "['"+key+"']"
lookup = 'data'+path
try:
value = eval(lookup)
if value is None:
value = default
except (KeyError, ValueError, IndexError, TypeError) as err:
value = default
finally:
if checknone:
if not value:
raise ValueError('missing value for %s' % path)
return value
def run_get_nessus_csv():
''' download Nessus scan results as CSV '''
# set params
nessus.export_format = 'csv'
nessus.save_path = pluginpath + '/data/' + nessus.export_format
# cleanup old files
for filename in glob.glob(nessus.save_path + '/*'):
try:
os.remove(filename)
except:
print('Failed to delete csv file %s' % filename)
# cycle through all Nessus servers, get scan
servers = dictator(config, 'nessus.servers', checknone=True)
for server in list(servers):
host = dictator(config, 'nessus.servers.{}.host'.format(server), checknone=True)
port = dictator(config, 'nessus.servers.{}.port'.format(server), default=80)
username = dictator(config, 'nessus.servers.{}.username'.format(server), checknone=True)
pw = dictator(config, 'nessus.servers.{}.pw'.format(server), checknone=True)
nessus.url = 'https://' + host + ':' + str(port)
nessus.server = server
# Login to Nessus
nessus.login = {'username': username, 'password': pw}
data = nessus.connect('POST', '/session', data=nessus.login)
nessus.token = data['token']
# get Trash folder ID
trash_id = nessus.get_trash_id()
# get all scans on Nessus server
all_scans = nessus.connect('GET', '/scans')
if not all_scans['scans']:
print('No scans detected on Nessus scanner %s' % nessus.url)
sys.exit()
# generate export - summary and by host
for scan in all_scans['scans']:
if scan['enabled'] == True and scan['folder_id'] != trash_id:
print('processing scan: {0}'.format(scan['name']))
# chapters: (vuln_hosts_summary, vuln_by_host, compliance_exec, remediations, vuln_by_plugin, compliance)
nessus.get_report(scan['name'], scan['id'], 'vuln_by_host')
print("===================== END OF RUN ======================")
if __name__ == "__main__":
try:
run_get_nessus_csv()
except Exception as e:
print(str(e))
raise
# NESSUS MAIN
import requests
import json
import sys
import os
import time
import re
# disable SSL warnings
requests.packages.urllib3.disable_warnings()
# initialize params from parent
url = ''
login = ''
token = ''
export_format = ''
server = ''
verify = False
save_path = ''
#=====================================================#
def build_url(resource):
return '{0}{1}'.format(url, resource)
#=====================================================#
def connect(method, resource, data=None):
headers = {'X-Cookie': 'token={0}'.format(token), 'content-type': 'application/json'}
data = json.dumps(data)
try:
if method == 'GET':
r = requests.get(build_url(resource), data=data, headers=headers, verify=False)
elif method == 'POST':
r = requests.post(build_url(resource), data=data, headers=headers, verify=False)
elif method == 'PUT':
r = requests.put(build_url(resource), data=data, headers=headers, verify=False)
except requests.exceptions.RequestException as e: # This is the correct syntax
print(str(e) + ": " + build_url(resource))
sys.exit(1)
if r.status_code not in range(200,206):
e = r.json()
print('%s error connecting to %s, HTTP status code: %s' % (__name__, url, r.status_code))
sys.exit(1)
if 'download' in resource:
return r.content
else:
return r.json()
#=====================================================#
def export_status(scan_id, file_id):
data = connect('GET', '/scans/{0}/export/{1}/status'.format(scan_id, file_id))
return data['status'] == 'ready'
#=====================================================#
def get_last_history_index(history):
count = 0
for hist in history:
count = count+1
return count-1
#=====================================================#
def get_last_history(scan_id):
data = connect('GET', '/scans/{0}'.format(scan_id))
if data['history']:
return data['history'][get_last_history_index(data['history'])]['history_id']
#=====================================================#
def get_trash_id():
''' If any report is moved to Trash folder in Nessus, skip it '''
data = connect('GET', '/scans')
# check Trash id
for folder in data['folders']:
if folder['name'] == 'Trash':
return folder['id']
#=====================================================#
def get_report(scan_name, scan_id, chapter):
''' find report, if enabled and has scan history then download as CSV '''
# get Scan ID
data = connect('GET', '/scans')
# get File ID
last_history = get_last_history(scan_id)
if last_history:
fmt = {"format": export_format, "chapters": chapter, "history_id": "{0}".format(last_history)}
data = connect('POST', '/scans/{0}/export'.format(scan_id), data=fmt)
file_id = data['file']
else:
print('scan {0} ID:{1} does not have a scan history, skipping..').format(scan_name, scan_id)
raise Exception("no scan history")
while export_status(scan_id, file_id) is False:
time.sleep(5)
# Save scan file
if not(os.path.exists(save_path)):
try:
os.makedirs(save_path)
except Exception, e:
print("Failed to create save path: %s" % e)
data = connect('GET', '/scans/{0}/export/{1}/download'.format(scan_id, file_id))
# format Scan export as "NessusEnv_ScanName.format"
scan_name = re.sub('/[!@#$%^&*()_+<>]', '_', scan_name)
tmp_name = server.upper()+'_'+scan_name.replace(' ', '_').lower()+'_'+chapter+'.'+export_format
file_name = tmp_name.format(scan_name.encode('utf-8'), file_id, export_format)
# remove bad characters
file_name = re.sub('[!@#$%^&*:;\\\~()?/_+<>=,]', '_', file_name).replace('__', '_').replace('__', '_')
file_name = re.sub('_vuln_by_host', '', file_name)
print('processing scan: %s' % chapter.upper())
report_file = os.path.join(save_path, file_name)
print('saving report to: %s' % report_file)
with open(report_file, 'w') as f:
f.write(data)
print('END OF SCAN %s' % chapter.upper())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment