-
-
Save linickx/12c8eb7232834e3eea4b76c4f00ac2f1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding=utf-8 | |
# Python linter configuration. | |
# pylint: disable=I0011 | |
# pylint: disable=C0301 | |
# pylint: disable=W0702 | |
""" | |
Script to Generate a report on qualys, then download it. | |
1st Public Release - Tweaked from Private version, some code might need checking. | |
# Nick Bettison - Linickx.com | |
""" | |
import sys | |
import os | |
import logging | |
import logging.handlers | |
import socket | |
import time | |
import re | |
from xml.etree import ElementTree | |
version = "1.0" | |
""" | |
Variables to chang! | |
/Start | |
""" | |
# Qualys | |
apiuser = 'nick' | |
apipass = 'linickx' | |
apiurl = 'https://qualysguard.qg3.apps.qualys.com/api/2.0/fo/report/' | |
apisleep = 10 # Time to sleep between api calls | |
# Template ID, this is an array so you can have many! | |
apitemplateid = ["1712399"] | |
# Only tested with CSV - | |
apireportformat = "csv" | |
# Proxy! | |
proxies_enable = False | |
proxies = { | |
'http': 'http://proxy.local:8080', | |
'https': 'http://proxy.local:8080' | |
} | |
ssl_verify = True | |
path = '' | |
syslog_enable = False | |
syslog_server = "10.10.10.10" | |
syslog_port = 514 | |
syslog_protocol = "udp" | |
syslog_facility = "local0" | |
""" | |
/END | |
""" | |
# Logging Setup | |
logger = logging.getLogger("qualys1") | |
logger.setLevel(logging.INFO) # Default Loging Level | |
formatter = logging.Formatter('[%(levelname)s] %(asctime)s %(message)s') | |
# Log to Console | |
chandler = logging.StreamHandler() | |
chandler.setFormatter(formatter) | |
#chandler.setLevel(logging.DEBUG) # Use to over-ride logging level | |
# Add to logger | |
logger.addHandler(chandler) | |
if syslog_protocol == "udp": | |
socktype = socket.SOCK_DGRAM | |
elif syslog_protocol == "tcp": | |
socktype = socket.SOCK_STREAM | |
else: | |
logger.critical("Unknown Syslog Protocol %s", syslog_protocol) | |
syslog_enable = False | |
if syslog_enable: | |
# Log to Syslog | |
shandler = logging.handlers.SysLogHandler(address = (syslog_server, syslog_port), facility = syslog_facility, socktype = socktype) | |
sformatter = logging.Formatter('%(name)s - %(message)s') | |
shandler.setFormatter(sformatter) | |
#shandler.setLevel(logging.DEBUG) # Use to over-ride logging level | |
# Add to logger | |
logger.addHandler(shandler) | |
try: | |
import requests # http://docs.python-requests.org/en/master/ | |
except: | |
logger.error("import requests failed - type pip install requests") | |
logger.debug("Exception: %s", sys.exc_info()[0]) | |
sys.exit(1) | |
request_headers = {} # Python Curl Stuff! | |
request_headers['user-agent'] = 'LINICKX Downloader / Version ' + version | |
request_headers['X-Requested-With'] = 'QualysApi' | |
def qualys_status(): | |
""" | |
Returns the status of reports on qualys API | |
""" | |
# -d 'action=list' | |
data = {"action": "list"} | |
if proxies_enable: | |
qr = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
else: | |
qr = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
logger.info("Request status code: %s", qr.status_code) | |
qr.raise_for_status() # Lazy - Throw an error for bad status codes | |
qroot = ElementTree.fromstring(qr.content) | |
logger.debug(qr.content) | |
return qroot | |
def qualys_launch(template_id=None): | |
""" | |
Requests that a report is run via qualys API | |
""" | |
# -d "action=launch&template_id=1611111&output_format=csv" | |
data = {"action": "launch", "template_id":template_id, "output_format":apireportformat, "hide_header":"1"} | |
report_wait = True # Enable the loop | |
while report_wait: | |
if proxies_enable: | |
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
else: | |
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
logger.info("Request status code: %s", r.status_code) | |
r.raise_for_status() # Lazy - Throw an error for bad status codes | |
root = ElementTree.fromstring(r.content) | |
logger.debug(r.content) | |
if re.match(r'^Max[\s]number[\s]of[\s]allowed(.*)', root[0][1].text): | |
logger.info(root[0][1].text) | |
logger.info("Sleeping %s seconds", apisleep) | |
time.sleep(apisleep) | |
else: | |
report_wait = False | |
try: | |
root[0][2] | |
except: | |
logger.error(root[0][1].text) | |
return None | |
for item in root[0][2]: | |
counter = 0 | |
for entry in item: | |
logger.debug("Checking XML Entry: %s", entry.text) | |
counter += 1 # Increment the counter early, so value is saved after IF | |
if entry.text == "ID": | |
expected_report_id = item[counter].text | |
logger.info("Report ID: %s", expected_report_id) | |
return expected_report_id | |
return None | |
def qualys_delete(report_id=None): | |
""" | |
Requests that a saved report is deleted | |
""" | |
data = {"action": "delete", "id":report_id} | |
if proxies_enable: | |
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
else: | |
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=data) | |
logger.info("Delete Request status code: %s", r.status_code) | |
r.raise_for_status() # Lazy - Throw an error for bad status codes | |
root = ElementTree.fromstring(r.content) | |
logger.debug(r.content) | |
def main(): | |
""" | |
The main loop, request the reports. | |
""" | |
reports = [] | |
# Loop 1 launch the reports | |
for template_id in apitemplateid: | |
expected_report_id = qualys_launch(template_id) | |
if expected_report_id is None: | |
logger.critical("Something went wrong, report ID not found") | |
continue | |
the_report = {'expected_report_id':expected_report_id, 'template_id':template_id} | |
reports.append(the_report) | |
# Loop 2 Check the report status | |
for the_report in reports: | |
expected_report_id = the_report['expected_report_id'] | |
template_id = the_report['template_id'] | |
""" | |
Take a break, new reports take time... | |
""" | |
logger.debug("Sleeping %s seconds before requesting the report %s", apisleep, template_id) | |
time.sleep(apisleep) | |
""" | |
Begin section to request the status of the report | |
""" | |
report_running = True # Enable the loop | |
# Loop for checking the status | |
while report_running: | |
qualys_reports_list = qualys_status() | |
number_of_reports = len(qualys_reports_list[0][1]) | |
logger.debug("%s Reports found", number_of_reports) | |
# Loop through the qualys reports | |
for report in qualys_reports_list[0][1]: | |
try: | |
report_id = report[0].text | |
report_status = report[6][0].text | |
except: | |
logger.debug(report[0].text) # This is a weird edge case that I've not resolved! | |
logger.debug(report[6].text) | |
logger.debug("one day I will fix this weird shizzle") | |
continue | |
logger.debug("Report: %s Status: %s", report_id, report_status) | |
if report_id == expected_report_id: # Found our Report | |
if report_status == "Finished": # It's finished! | |
report_running = False | |
logger.info("Report: %s %s, ready for download", report_id, report_status) | |
else: | |
report_percent = report[6][2].text # Output the progress | |
logger.info("Report %s Still %s, %s percent complete", report_id, report_status, report_percent) | |
if report_running: # Only Sleep in long loops | |
try: | |
logger.debug("Sleeping %s seconds", apisleep) | |
time.sleep(apisleep) | |
except KeyboardInterrupt: | |
logger.critical("CTRL+C Detected, quitting...") | |
sys.exit(1) | |
# Loop 3 Download Reports | |
for the_report in reports: | |
expected_report_id = the_report['expected_report_id'] | |
template_id = the_report['template_id'] | |
""" | |
Begin section to download the report | |
""" | |
filename = path + "qualys_report_" + template_id + "." + apireportformat | |
temp_filename = filename + ".tmp" | |
# -d 'action=fetch&id=2911119' | |
qualys_data = {"action": "fetch", "id":expected_report_id} | |
if proxies_enable: | |
r = requests.post(apiurl, proxies=proxies, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=qualys_data, stream=True) | |
else: | |
r = requests.post(apiurl, verify=ssl_verify, auth=(apiuser, apipass), headers=request_headers, data=qualys_data, stream=True) | |
logger.info("Request status code: %s", r.status_code) | |
r.raise_for_status() # Lazy - Throw an error for bad status codes | |
#if apireportformat == "csv": | |
# # https://stackoverflow.com/a/14114741/1322110 | |
# with open(temp_filename, 'wb') as handle: | |
# for block in r.iter_content(1024): | |
# handle.write(block) | |
# # https://stackoverflow.com/a/23615677/1322110 | |
# # https://stackoverflow.com/a/27917855/1322110 | |
# # Qualys is sh*t and has garbage at the top of the file!! | |
# csv_regex = '^\"IP\".*' # CSV Header Regex | |
# csv_head_found = False # By Default not found | |
# with open(temp_filename, 'r') as f: # Open Source CSV | |
# with open(filename, 'w') as f1: # Wrire Destination CSV | |
# for line in f: | |
# if re.match(csv_regex, line): # Can we match the CSV Header | |
# csv_head_found = True | |
# if csv_head_found: # Only write destination CSV after the Header is found, causing other top-of-file #garbage to be ignored | |
# f1.write(line) | |
# logger.info("Report downloaded - %s", filename) | |
# os.remove(temp_filename) | |
#else: | |
with open(filename, 'wb') as handle: | |
for block in r.iter_content(1024): | |
handle.write(block) | |
qualys_delete(expected_report_id) | |
logger.info("FINISHED!") | |
# If run from interpreter, run main code function. | |
if __name__ == "__main__": | |
main() |
Yeah, I'm still using it... not tried a PowerBI Connector.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Are you still maintaining or using this script for use with PowerBI? I saw your webpage about this but am curious if you've tried a custom PowerBI connector to avoid running Python or hosting this on a jump server (i.e. cloud to cloud).