#!/usr/bin/env python | |
# coding=utf-8 | |
# Python linter configuration. | |
# pylint: disable=I0011 | |
# pylint: disable=C0301 | |
# pylint: disable=W0702 | |
""" | |
Script to Generate a report on qualys, then download it. | |
1st Public Release - Tweaked from Private version, some code might need checking. | |
# Nick Bettison - Linickx.com | |
""" | |
import sys | |
import os | |
import logging | |
import logging.handlers | |
import socket | |
import time | |
import re | |
from xml.etree import ElementTree | |
version = "1.0" | |
""" | |
Variables to chang! | |
/Start | |
""" | |
# Qualys | |
apiuser = 'nick' | |
apipass = 'linickx' | |
apiurl = 'https://qualysguard.qg3.apps.qualys.com/api/2.0/fo/report/' | |
apisleep = 10 # Time to sleep between api calls | |
# Template ID, this is an array so you can have many! | |
apitemplateid = ["1712399"] | |
# Only tested with CSV - | |
apireportformat = "csv" | |
# Proxy! | |
proxies_enable = False | |
proxies = { | |
'http': 'http://proxy.local:8080', | |
'https': 'http://proxy.local:8080' | |
} | |
ssl_verify = True | |
path = '' | |
syslog_enable = False | |
syslog_server = "10.10.10.10" | |
syslog_port = 514 | |
syslog_protocol = "udp" | |
syslog_facility = "local0" | |
""" | |
/END | |
""" | |
# Logging Setup | |
logger = logging.getLogger("qualys1") | |
logger.setLevel(logging.INFO) # Default Loging Level | |
formatter = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s') | |
# Log to Console | |
chandler = logging.StreamHandler() | |
chandler.setFormatter(formatter) | |
#chandler.setLevel(logging.DEBUG) # Use to over-ride logging level | |
# Add to logger | |
logger.addHandler(chandler) | |
if syslog_protocol == "udp": | |
socktype = socket.SOCK_DGRAM | |
elif syslog_protocol == "tcp": | |
socktype = socket.SOCK_STREAM | |
else: | |
logger.critical("Unknown Syslog Protocol %s", syslog_protocol) | |
syslog_enable = False | |
if syslog_enable: | |
# Log to Syslog | |
shandler = logging.handlers.SysLogHandler(address = (syslog_server, syslog_port), facility = syslog_facility, socktype = socktype) | |
sformatter = logging.Formatter('%(name)s - %(message)s') | |
shandler.setFormatter(sformatter) | |
#shandler.setLevel(logging.DEBUG) # Use to over-ride logging level | |
# Add to logger | |
logger.addHandler(shandler) | |
try: | |
import requests # http://docs.python-requests.org/en/master/ | |
except: | |
logger.error("import requests failed - type pip install requests") | |
logger.debug("Exception: %s", sys.exc_info()[0]) | |
sys.exit(1) | |
request_headers = {} # Python Curl Stuff! | |
request_headers['user-agent'] = 'LINICKX Downloader / Version ' + version | |
request_headers['X-Requested-With'] = 'QualysApi' | |
def qualys_status(): | |
""" | |
Returns the status of reports on qualys API | |
""" | |
# -d 'action=list' | |
data = {"action": "list"} | |
if proxies_enable: | |
qr = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
else: | |
qr = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
logger.info("Request status code: %s", qr.status_code) | |
qr.raise_for_status() # Lazy - Throw an error for bad status codes | |
qroot = ElementTree.fromstring(qr.content) | |
logger.debug(qr.content) | |
return qroot | |
def qualys_launch(template_id=None): | |
""" | |
Requests that a report is run via qualys API | |
""" | |
# -d "action=launch&template_id=1611111&output_format=csv" | |
data = {"action": "launch", "template_id":template_id, "output_format":apireportformat, "hide_header":"1"} | |
report_wait = True # Enable the loop | |
while report_wait: | |
if proxies_enable: | |
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
else: | |
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
logger.info("Request status code: %s", r.status_code) | |
r.raise_for_status() # Lazy - Throw an error for bad status codes | |
root = ElementTree.fromstring(r.content) | |
logger.debug(r.content) | |
if re.match(r'^Max[\s]number[\s]of[\s]allowed(.*)', root[0][1].text): | |
logger.info(root[0][1].text) | |
logger.info("Sleeping %s seconds", apisleep) | |
time.sleep(apisleep) | |
else: | |
report_wait = False | |
try: | |
root[0][2] | |
except: | |
logger.error(root[0][1].text) | |
return None | |
for item in root[0][2]: | |
counter = 0 | |
for entry in item: | |
logger.debug("Checking XML Entry: %s", entry.text) | |
counter += 1 # Increment the counter early, so value is saved after IF | |
if entry.text == "ID": | |
expected_report_id = item[counter].text | |
logger.info("Report ID: %s", expected_report_id) | |
return expected_report_id | |
return None | |
def qualys_delete(report_id=None): | |
""" | |
Requests that a saved report is deleted | |
""" | |
data = {"action": "delete", "id":report_id} | |
if proxies_enable: | |
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
else: | |
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
logger.info("Delete Request status code: %s", r.status_code) | |
r.raise_for_status() # Lazy - Throw an error for bad status codes | |
root = ElementTree.fromstring(r.content) | |
logger.debug(r.content) | |
def main(): | |
""" | |
The main loop, request the reports. | |
""" | |
reports = [] | |
# Loop 1 launch the reports | |
for template_id in apitemplateid: | |
expected_report_id = qualys_launch(template_id) | |
if expected_report_id is None: | |
logger.critical("Something went wrong, report ID not found") | |
continue | |
the_report = {'expected_report_id':expected_report_id, 'template_id':template_id} | |
reports.append(the_report) | |
# Loop 2 Check the report status | |
for the_report in reports: | |
expected_report_id = the_report['expected_report_id'] | |
template_id = the_report['template_id'] | |
""" | |
Take a break, new reports take time... | |
""" | |
logger.debug("Sleeping %s seconds before requesting the report %s", apisleep, template_id) | |
time.sleep(apisleep) | |
""" | |
Begin section to request the status of the report | |
""" | |
report_running = True # Enable the loop | |
# Loop for checking the status | |
while report_running: | |
qualys_reports_list = qualys_status() | |
number_of_reports = len(qualys_reports_list[0][1]) | |
logger.debug("%s Reports found", number_of_reports) | |
# Loop through the qualys reports | |
for report in qualys_reports_list[0][1]: | |
try: | |
report_id = report[0].text | |
report_status = report[6][0].text | |
except: | |
logger.debug(report[0].text) # This is a weird edge case that I've not resolved! | |
logger.debug(report[6].text) | |
logger.debug("one day I will fix this weird shizzle") | |
continue | |
logger.debug("Report: %s Status: %s", report_id, report_status) | |
if report_id == expected_report_id: # Found our Report | |
if report_status == "Finished": # It's finished! | |
report_running = False | |
logger.info("Report: %s %s, ready for download", report_id, report_status) | |
else: | |
report_percent = report[6][2].text # Output the progress | |
logger.info("Report %s Still %s, %s percent complete", report_id, report_status, report_percent) | |
if report_running: # Only Sleep in long loops | |
try: | |
logger.debug("Sleeping %s seconds", apisleep) | |
time.sleep(apisleep) | |
except KeyboardInterrupt: | |
logger.critical("CTRL+C Detected, quitting...") | |
sys.exit(1) | |
# Loop 3 Download Reports | |
for the_report in reports: | |
expected_report_id = the_report['expected_report_id'] | |
template_id = the_report['template_id'] | |
""" | |
Begin section to download the report | |
""" | |
filename = path + "qualys_report_" + template_id + "." + apireportformat | |
temp_filename = filename + ".tmp" | |
# -d 'action=fetch&id=2911119' | |
qualys_data = {"action": "fetch", "id":expected_report_id} | |
if proxies_enable: | |
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=qualys_data, stream=True) | |
else: | |
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=qualys_data, stream=True) | |
logger.info("Request status code: %s", r.status_code) | |
r.raise_for_status() # Lazy - Throw an error for bad status codes | |
#if apireportformat == "csv": | |
# # https://stackoverflow.com/a/14114741/1322110 | |
# with open(temp_filename, 'wb') as handle: | |
# for block in r.iter_content(1024): | |
# handle.write(block) | |
# # https://stackoverflow.com/a/23615677/1322110 | |
# # https://stackoverflow.com/a/27917855/1322110 | |
# # Qualys is sh*t and has garbage at the top of the file!! | |
# csv_regex = '^\"IP\".*' # CSV Header Regex | |
# csv_head_found = False # By Default not found | |
# with open(temp_filename, 'r') as f: # Open Source CSV | |
# with open(filename, 'w') as f1: # Wrire Destination CSV | |
# for line in f: | |
# if re.match(csv_regex, line): # Can we match the CSV Header | |
# csv_head_found = True | |
# if csv_head_found: # Only write destination CSV after the Header is found, causing other top-of-file #garbage to be ignored | |
# f1.write(line) | |
# logger.info("Report downloaded - %s", filename) | |
# os.remove(temp_filename) | |
#else: | |
with open(filename, 'wb') as handle: | |
for block in r.iter_content(1024): | |
handle.write(block) | |
qualys_delete(expected_report_id) | |
logger.info("FINISHED!") | |
# If run from interpreter, run main code function. | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment