Skip to content

Instantly share code, notes, and snippets.

@linickx
Created November 12, 2018 14:53
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linickx/12c8eb7232834e3eea4b76c4f00ac2f1 to your computer and use it in GitHub Desktop.
Save linickx/12c8eb7232834e3eea4b76c4f00ac2f1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding=utf-8
# Python linter configuration.
# pylint: disable=I0011
# pylint: disable=C0301
# pylint: disable=W0702
"""
Script to Generate a report on qualys, then download it.
1st Public Release - Tweaked from Private version, some code might need checking.
# Nick Bettison - Linickx.com
"""
import sys
import os
import logging
import logging.handlers
import socket
import time
import re
from xml.etree import ElementTree
version = "1.0"
"""
Variables to chang!
/Start
"""
# Qualys
apiuser = 'nick'
apipass = 'linickx'
apiurl = 'https://qualysguard.qg3.apps.qualys.com/api/2.0/fo/report/'
apisleep = 10 # Time to sleep between api calls
# Template ID, this is an array so you can have many!
apitemplateid = ["1712399"]
# Only tested with CSV -
apireportformat = "csv"
# Proxy!
proxies_enable = False
proxies = {
'http': 'http://proxy.local:8080',
'https': 'http://proxy.local:8080'
}
ssl_verify = True
path = ''
syslog_enable = False
syslog_server = "10.10.10.10"
syslog_port = 514
syslog_protocol = "udp"
syslog_facility = "local0"
"""
/END
"""
# Logging Setup
logger = logging.getLogger("qualys1")
logger.setLevel(logging.INFO) # Default Loging Level
formatter = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s')
# Log to Console
chandler = logging.StreamHandler()
chandler.setFormatter(formatter)
#chandler.setLevel(logging.DEBUG) # Use to over-ride logging level
# Add to logger
logger.addHandler(chandler)
if syslog_protocol == "udp":
socktype = socket.SOCK_DGRAM
elif syslog_protocol == "tcp":
socktype = socket.SOCK_STREAM
else:
logger.critical("Unknown Syslog Protocol %s", syslog_protocol)
syslog_enable = False
if syslog_enable:
# Log to Syslog
shandler = logging.handlers.SysLogHandler(address = (syslog_server, syslog_port), facility = syslog_facility, socktype = socktype)
sformatter = logging.Formatter('%(name)s - %(message)s')
shandler.setFormatter(sformatter)
#shandler.setLevel(logging.DEBUG) # Use to over-ride logging level
# Add to logger
logger.addHandler(shandler)
try:
import requests # http://docs.python-requests.org/en/master/
except:
logger.error("import requests failed - type pip install requests")
logger.debug("Exception: %s", sys.exc_info()[0])
sys.exit(1)
request_headers = {} # Python Curl Stuff!
request_headers['user-agent'] = 'LINICKX Downloader / Version ' + version
request_headers['X-Requested-With'] = 'QualysApi'
def qualys_status():
"""
Returns the status of reports on qualys API
"""
# -d 'action=list'
data = {"action": "list"}
if proxies_enable:
qr = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data)
else:
qr = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data)
logger.info("Request status code: %s", qr.status_code)
qr.raise_for_status() # Lazy - Throw an error for bad status codes
qroot = ElementTree.fromstring(qr.content)
logger.debug(qr.content)
return qroot
def qualys_launch(template_id=None):
"""
Requests that a report is run via qualys API
"""
# -d "action=launch&template_id=1611111&output_format=csv"
data = {"action": "launch", "template_id":template_id, "output_format":apireportformat, "hide_header":"1"}
report_wait = True # Enable the loop
while report_wait:
if proxies_enable:
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data)
else:
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data)
logger.info("Request status code: %s", r.status_code)
r.raise_for_status() # Lazy - Throw an error for bad status codes
root = ElementTree.fromstring(r.content)
logger.debug(r.content)
if re.match(r'^Max[\s]number[\s]of[\s]allowed(.*)', root[0][1].text):
logger.info(root[0][1].text)
logger.info("Sleeping %s seconds", apisleep)
time.sleep(apisleep)
else:
report_wait = False
try:
root[0][2]
except:
logger.error(root[0][1].text)
return None
for item in root[0][2]:
counter = 0
for entry in item:
logger.debug("Checking XML Entry: %s", entry.text)
counter += 1 # Increment the counter early, so value is saved after IF
if entry.text == "ID":
expected_report_id = item[counter].text
logger.info("Report ID: %s", expected_report_id)
return expected_report_id
return None
def qualys_delete(report_id=None):
"""
Requests that a saved report is deleted
"""
data = {"action": "delete", "id":report_id}
if proxies_enable:
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data)
else:
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data)
logger.info("Delete Request status code: %s", r.status_code)
r.raise_for_status() # Lazy - Throw an error for bad status codes
root = ElementTree.fromstring(r.content)
logger.debug(r.content)
def main():
"""
The main loop, request the reports.
"""
reports = []
# Loop 1 launch the reports
for template_id in apitemplateid:
expected_report_id = qualys_launch(template_id)
if expected_report_id is None:
logger.critical("Something went wrong, report ID not found")
continue
the_report = {'expected_report_id':expected_report_id, 'template_id':template_id}
reports.append(the_report)
# Loop 2 Check the report status
for the_report in reports:
expected_report_id = the_report['expected_report_id']
template_id = the_report['template_id']
"""
Take a break, new reports take time...
"""
logger.debug("Sleeping %s seconds before requesting the report %s", apisleep, template_id)
time.sleep(apisleep)
"""
Begin section to request the status of the report
"""
report_running = True # Enable the loop
# Loop for checking the status
while report_running:
qualys_reports_list = qualys_status()
number_of_reports = len(qualys_reports_list[0][1])
logger.debug("%s Reports found", number_of_reports)
# Loop through the qualys reports
for report in qualys_reports_list[0][1]:
try:
report_id = report[0].text
report_status = report[6][0].text
except:
logger.debug(report[0].text) # This is a weird edge case that I've not resolved!
logger.debug(report[6].text)
logger.debug("one day I will fix this weird shizzle")
continue
logger.debug("Report: %s Status: %s", report_id, report_status)
if report_id == expected_report_id: # Found our Report
if report_status == "Finished": # It's finished!
report_running = False
logger.info("Report: %s %s, ready for download", report_id, report_status)
else:
report_percent = report[6][2].text # Output the progress
logger.info("Report %s Still %s, %s percent complete", report_id, report_status, report_percent)
if report_running: # Only Sleep in long loops
try:
logger.debug("Sleeping %s seconds", apisleep)
time.sleep(apisleep)
except KeyboardInterrupt:
logger.critical("CTRL+C Detected, quitting...")
sys.exit(1)
# Loop 3 Download Reports
for the_report in reports:
expected_report_id = the_report['expected_report_id']
template_id = the_report['template_id']
"""
Begin section to download the report
"""
filename = path + "qualys_report_" + template_id + "." + apireportformat
temp_filename = filename + ".tmp"
# -d 'action=fetch&id=2911119'
qualys_data = {"action": "fetch", "id":expected_report_id}
if proxies_enable:
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=qualys_data, stream=True)
else:
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=qualys_data, stream=True)
logger.info("Request status code: %s", r.status_code)
r.raise_for_status() # Lazy - Throw an error for bad status codes
#if apireportformat == "csv":
# # https://stackoverflow.com/a/14114741/1322110
# with open(temp_filename, 'wb') as handle:
# for block in r.iter_content(1024):
# handle.write(block)
# # https://stackoverflow.com/a/23615677/1322110
# # https://stackoverflow.com/a/27917855/1322110
# # Qualys is sh*t and has garbage at the top of the file!!
# csv_regex = '^\"IP\".*' # CSV Header Regex
# csv_head_found = False # By Default not found
# with open(temp_filename, 'r') as f: # Open Source CSV
# with open(filename, 'w') as f1: # Wrire Destination CSV
# for line in f:
# if re.match(csv_regex, line): # Can we match the CSV Header
# csv_head_found = True
# if csv_head_found: # Only write destination CSV after the Header is found, causing other top-of-file #garbage to be ignored
# f1.write(line)
# logger.info("Report downloaded - %s", filename)
# os.remove(temp_filename)
#else:
with open(filename, 'wb') as handle:
for block in r.iter_content(1024):
handle.write(block)
qualys_delete(expected_report_id)
logger.info("FINISHED!")
# If run from interpreter, run main code function.
if __name__ == "__main__":
main()
@mgaryGCM
Copy link

Are you still maintaining or using this script for use with PowerBI? I saw your webpage about this but am curious if you've tried a custom PowerBI connector to avoid running Python or hosting this on a jump server (i.e. cloud to cloud).

@linickx
Copy link
Author

linickx commented Nov 20, 2020

Yeah, I'm still using it... not tried a PowerBI Connector.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment