Last active
August 10, 2021 11:05
-
-
Save perfecto25/dc77aaa951f473f33e59e9c99e2a53cf to your computer and use it in GitHub Desktop.
Nessus API scripts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
# GET NESSUS CSV | |
# gets Nessus reports as CSV export | |
import sys | |
import os | |
import glob | |
from ast import literal_eval | |
import nessus | |
config = 'nessus_config.json' | |
def dictator(data, path=None, default=None, checknone=False): | |
''' | |
The Dictator function takes a dictionary or JSON data and returns value for a specific key. | |
If dictator doesnt find a value for a key, or the data is missing the key, the return value is | |
either None or whatever fallback value you provide as default="My Default Fallback value". | |
Dictator is polite with Exception errors commonly encountered when parsing large Dictionaries/JSONs | |
Usage: | |
get a value from a Dictionary key | |
> dictator(data, "employees[5].firstName") | |
with custom value on lookup failure, | |
> dictator(data, "employees[5].firstName", default="No employee found") | |
pass a parameter | |
> dictator(data, "company.name.{}".format(my_company)) | |
lookup a 3rd element of List Json, on second key, lookup index=5 | |
> dictator(data, "3.first.second[5]") | |
lookup a nested list of lists | |
> dictator(data, "0.first[1].2.second.third[0].2" | |
check if return value is None, if it is, raise a Value error | |
> dictator(data, "some.key.value", checknone=True) | |
> error Value Error | |
''' | |
import json | |
if path is None or path == '': | |
return json.dumps(data) | |
value = None | |
keys = path.split(".") | |
# reset path | |
path = None | |
# if 1st key is a list index | |
if all(char.isdigit() for char in keys[0]): | |
path = '['+keys[0]+']' | |
# remove 1st key from key list | |
keys.pop(0) | |
# build proper path | |
for key in keys: | |
# check if key is a list | |
if key.endswith(']'): | |
temp = key.split('[') | |
key = ''.join(temp[0]) | |
index = int(temp[1].strip(']')) | |
if path is None: | |
path = "['"+key+"']"+"["+str(index)+"]" | |
else: | |
path = path+"['"+key+"']"+"["+str(index)+"]" | |
else: | |
if path is None: | |
path = "['"+key+"']" | |
else: | |
# check if key is an index | |
if key.isdigit() is True: | |
path = path + "["+key+"]" | |
else: | |
path = path + "['"+key+"']" | |
lookup = 'data'+path | |
try: | |
value = eval(lookup) | |
if value is None: | |
value = default | |
except (KeyError, ValueError, IndexError, TypeError) as err: | |
value = default | |
finally: | |
if checknone: | |
if not value: | |
raise ValueError('missing value for %s' % path) | |
return value | |
def run_get_nessus_csv(): | |
''' download Nessus scan results as CSV ''' | |
# set params | |
nessus.export_format = 'csv' | |
nessus.save_path = pluginpath + '/data/' + nessus.export_format | |
# cleanup old files | |
for filename in glob.glob(nessus.save_path + '/*'): | |
try: | |
os.remove(filename) | |
except: | |
print('Failed to delete csv file %s' % filename) | |
# cycle through all Nessus servers, get scan | |
servers = dictator(config, 'nessus.servers', checknone=True) | |
for server in list(servers): | |
host = dictator(config, 'nessus.servers.{}.host'.format(server), checknone=True) | |
port = dictator(config, 'nessus.servers.{}.port'.format(server), default=80) | |
username = dictator(config, 'nessus.servers.{}.username'.format(server), checknone=True) | |
pw = dictator(config, 'nessus.servers.{}.pw'.format(server), checknone=True) | |
nessus.url = 'https://' + host + ':' + str(port) | |
nessus.server = server | |
# Login to Nessus | |
nessus.login = {'username': username, 'password': pw} | |
data = nessus.connect('POST', '/session', data=nessus.login) | |
nessus.token = data['token'] | |
# get Trash folder ID | |
trash_id = nessus.get_trash_id() | |
# get all scans on Nessus server | |
all_scans = nessus.connect('GET', '/scans') | |
if not all_scans['scans']: | |
print('No scans detected on Nessus scanner %s' % nessus.url) | |
sys.exit() | |
# generate export - summary and by host | |
for scan in all_scans['scans']: | |
if scan['enabled'] == True and scan['folder_id'] != trash_id: | |
print('processing scan: {0}'.format(scan['name'])) | |
# chapters: (vuln_hosts_summary, vuln_by_host, compliance_exec, remediations, vuln_by_plugin, compliance) | |
nessus.get_report(scan['name'], scan['id'], 'vuln_by_host') | |
print("===================== END OF RUN ======================") | |
if __name__ == "__main__": | |
try: | |
run_get_nessus_csv() | |
except Exception as e: | |
print(str(e)) | |
raise | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NESSUS MAIN | |
import requests | |
import json | |
import sys | |
import os | |
import time | |
import re | |
# disable SSL warnings | |
requests.packages.urllib3.disable_warnings() | |
# initialize params from parent | |
url = '' | |
login = '' | |
token = '' | |
export_format = '' | |
server = '' | |
verify = False | |
save_path = '' | |
#=====================================================# | |
def build_url(resource): | |
return '{0}{1}'.format(url, resource) | |
#=====================================================# | |
def connect(method, resource, data=None): | |
headers = {'X-Cookie': 'token={0}'.format(token), 'content-type': 'application/json'} | |
data = json.dumps(data) | |
try: | |
if method == 'GET': | |
r = requests.get(build_url(resource), data=data, headers=headers, verify=False) | |
elif method == 'POST': | |
r = requests.post(build_url(resource), data=data, headers=headers, verify=False) | |
elif method == 'PUT': | |
r = requests.put(build_url(resource), data=data, headers=headers, verify=False) | |
except requests.exceptions.RequestException as e: # This is the correct syntax | |
print(str(e) + ": " + build_url(resource)) | |
sys.exit(1) | |
if r.status_code not in range(200,206): | |
e = r.json() | |
print('%s error connecting to %s, HTTP status code: %s' % (__name__, url, r.status_code)) | |
sys.exit(1) | |
if 'download' in resource: | |
return r.content | |
else: | |
return r.json() | |
#=====================================================# | |
def export_status(scan_id, file_id): | |
data = connect('GET', '/scans/{0}/export/{1}/status'.format(scan_id, file_id)) | |
return data['status'] == 'ready' | |
#=====================================================# | |
def get_last_history_index(history): | |
count = 0 | |
for hist in history: | |
count = count+1 | |
return count-1 | |
#=====================================================# | |
def get_last_history(scan_id): | |
data = connect('GET', '/scans/{0}'.format(scan_id)) | |
if data['history']: | |
return data['history'][get_last_history_index(data['history'])]['history_id'] | |
#=====================================================# | |
def get_trash_id(): | |
''' If any report is moved to Trash folder in Nessus, skip it ''' | |
data = connect('GET', '/scans') | |
# check Trash id | |
for folder in data['folders']: | |
if folder['name'] == 'Trash': | |
return folder['id'] | |
#=====================================================# | |
def get_report(scan_name, scan_id, chapter): | |
''' find report, if enabled and has scan history then download as CSV ''' | |
# get Scan ID | |
data = connect('GET', '/scans') | |
# get File ID | |
last_history = get_last_history(scan_id) | |
if last_history: | |
fmt = {"format": export_format, "chapters": chapter, "history_id": "{0}".format(last_history)} | |
data = connect('POST', '/scans/{0}/export'.format(scan_id), data=fmt) | |
file_id = data['file'] | |
else: | |
print('scan {0} ID:{1} does not have a scan history, skipping..').format(scan_name, scan_id) | |
raise Exception("no scan history") | |
while export_status(scan_id, file_id) is False: | |
time.sleep(5) | |
# Save scan file | |
if not(os.path.exists(save_path)): | |
try: | |
os.makedirs(save_path) | |
except Exception, e: | |
print("Failed to create save path: %s" % e) | |
data = connect('GET', '/scans/{0}/export/{1}/download'.format(scan_id, file_id)) | |
# format Scan export as "NessusEnv_ScanName.format" | |
scan_name = re.sub('/[!@#$%^&*()_+<>]', '_', scan_name) | |
tmp_name = server.upper()+'_'+scan_name.replace(' ', '_').lower()+'_'+chapter+'.'+export_format | |
file_name = tmp_name.format(scan_name.encode('utf-8'), file_id, export_format) | |
# remove bad characters | |
file_name = re.sub('[!@#$%^&*:;\\\~()?/_+<>=,]', '_', file_name).replace('__', '_').replace('__', '_') | |
file_name = re.sub('_vuln_by_host', '', file_name) | |
print('processing scan: %s' % chapter.upper()) | |
report_file = os.path.join(save_path, file_name) | |
print('saving report to: %s' % report_file) | |
with open(report_file, 'w') as f: | |
f.write(data) | |
print('END OF SCAN %s' % chapter.upper()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment