Last active
April 16, 2021 00:32
-
-
Save BitTheByte/b88e35864848ab1abeb52742e85e7cb5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import flask | |
import requests | |
import time | |
import threading | |
import os | |
import random | |
requests.packages.urllib3.disable_warnings() | |
acunetix_host = "127.0.0.1" | |
acunetix_port = 3443 | |
acunetix_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" | |
max_active_scans = 10 | |
pending_targets = set() | |
pending_file_name = "pending.scans.list" | |
API_BASE = "/api/v1/" | |
API_SCAN = API_BASE + "scans" | |
API_TARGET = API_BASE + "targets" | |
target_criticality_list = { | |
"critical": "10", | |
"high": "20", | |
"normal": "10", | |
"low": "0", | |
} | |
target_criticality_allowed = list(target_criticality_list.keys()) | |
scan_profiles_list = { | |
"full_scan": "11111111-1111-1111-1111-111111111111", | |
"high_risk_vuln": "11111111-1111-1111-1111-111111111112", | |
"xss_vuln": "11111111-1111-1111-1111-111111111116", | |
"sql_injection_vuln": "11111111-1111-1111-1111-111111111113", | |
"weak_passwords": "11111111-1111-1111-1111-111111111115", | |
"crawl_only": "11111111-1111-1111-1111-111111111117", | |
} | |
scan_profiles_allowed = list(scan_profiles_list.keys()) | |
class AXException(Exception): | |
HTTP_ERROR = "httpError" | |
AUTH_ERROR = "authError" | |
SERVER_RESOURCE = "serverResource" | |
NOT_ALLOWED_CRITICYLITY_PROFILE = "Criticallity not found" | |
NOT_ALLOWED_SCAN_PROFILE = "Scan Profile not found" | |
JSON_PARSING_ERROR = "Decoding JSON has failed" | |
def __init__(self, key, message): | |
Exception.__init__(self, message) | |
self.key = key | |
class Acunetix(object): | |
def __init__(self, host=None, api=None, timeout=20): | |
self.apikey = api | |
self.host = str( | |
"{}{}".format("https://" if "https://" not in host else "", host) | |
) | |
self.timeout = timeout | |
self.headers = { | |
"X-Auth": self.apikey, | |
"content-type": "application/json", | |
"User-Agent": "Acunetix", | |
} | |
self.authenticated = self.__is_connected() | |
if not self.authenticated: | |
raise AXException("AUTH_ERROR", "Wrong API Key !") | |
def __json_return(self, data): | |
try: | |
return json.loads(data) | |
except Exception as e: | |
raise AXException("JSON_PARSING_ERROR", f"Json Parsing has occured: {e}") | |
def __send_request(self, method="get", endpoint="", data=None): | |
request_call = getattr(requests, method) | |
url = str("{}{}".format(self.host, endpoint if endpoint else "/")) | |
try: | |
request = request_call( | |
url, | |
headers=self.headers, | |
timeout=self.timeout, | |
data=json.dumps(data), | |
verify=False, | |
) | |
if request.status_code == 403: | |
raise AXException("HTTP_ERROR", f"HTTP ERROR OCCURED: {request.text}") | |
return self.__json_return(request.text) | |
except Exception as e: | |
raise AXException("HTTP_ERROR", f"HTTP ERROR OCCURED: {e}") | |
def __is_connected(self): | |
return False if 'Unauthorized' in str(self.info()) else True | |
def info(self): | |
return self.__send_request(method="get", endpoint="/api/v1/info") | |
def targets(self): | |
return self.__send_request( | |
method="get", endpoint=f"{API_TARGET}?pagination=200" | |
) | |
def add_target(self, target="", criticality="normal"): | |
if criticality not in target_criticality_allowed: | |
raise AXException("NOT_ALLOWED_CRITICYLITY_PROFILE", | |
"Criticallity not found allowed values {}".format(str(list(target_criticality_allowed)))) | |
target_address = ( | |
target if "http://" in target or "https://" in target else "http://{}".format(target) | |
) | |
data = { | |
"address": str(target_address), | |
"description": "Sent from Acunetix-Python", | |
"criticality": target_criticality_list[criticality], | |
} | |
return self.__send_request(method="post", endpoint=API_TARGET, data=data) | |
def delete_target(self, target_id): | |
try: | |
return self.__send_request( | |
method="delete", endpoint=f"{API_TARGET}/{target_id}" | |
) | |
except: | |
pass | |
def delete_all_targets(self): | |
while True: | |
targets = self.targets() | |
if len(targets["targets"]): | |
for target in targets["targets"]: | |
self.delete_target(target["target_id"]) | |
else: | |
break | |
def running_scans(self): | |
return self.__send_request(method="get", endpoint=f"{API_SCAN}?l=100&q=status:processing;") | |
def starting_scans(self): | |
return self.__send_request(method="get", endpoint=f"{API_SCAN}?l=100&q=status:starting;") | |
def start_scan(self, address=None, target_id=None, scan_profile="full_scan"): | |
if scan_profile not in scan_profiles_allowed: | |
raise AXException("NOT_ALLOWED_SCAN_PROFILE", | |
"Scan Profile not found allowed values {}".format(str(list(scan_profiles_allowed)))) | |
if address and not target_id: | |
target_id = self.add_target(target=address)["target_id"] | |
scan_payload = { | |
"target_id": str(target_id), | |
"profile_id": scan_profiles_list[scan_profile], | |
"schedule": {"disable": False, "start_date": None, "time_sensitive": False}, | |
} | |
return target_id, scan_profile, self.__send_request(method="post", endpoint=API_SCAN, data=scan_payload) | |
app = flask.Flask(import_name='') | |
@app.route("/add_target/<host>") | |
def add_interface(host): | |
global pending_targets | |
pending_targets.add(host) | |
return f"Added {host} to pending targets queue. len = {len(pending_targets)}" | |
@app.route("/show_targets") | |
def target_interface(): | |
global pending_targets | |
return '\n'.join(pending_targets) | |
def populate_from_file(name, _list): | |
data = [target.strip() for target in open(name, "r").read().split("\n") if target.strip()] | |
for item in data: | |
_list.add(item) | |
return _list | |
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=80)).start() | |
acunetix = Acunetix(host="%s:%i" % (acunetix_host, acunetix_port), api=acunetix_token) | |
if not os.path.isfile(pending_file_name): | |
open(pending_file_name, "w").write("") | |
while 1: | |
try: | |
time.sleep(5) | |
populate_from_file(pending_file_name, pending_targets) | |
print(f"[INFO] number of targets in qeueue = {len(pending_targets)}") | |
n_running_scans = len(acunetix.running_scans()['scans']) | |
n_starting_scans = len(acunetix.starting_scans()['scans']) | |
if n_running_scans >= max_active_scans or n_starting_scans >= max_active_scans or len(pending_targets) == 0: | |
print(f"[WARNING] max_active_scans of {max_active_scans} scan(s) is reached or no pending targets") | |
continue | |
temp = pending_targets.copy() | |
try: | |
target = pending_targets.pop() | |
target_id, scan_id, response = acunetix.start_scan(target) | |
print(f"[INFO] started new scan for {target} with target_id={target_id}, scan_id={scan_id}") | |
open(pending_file_name, "w").write('\n'.join(pending_targets)) | |
except Exception as e : | |
print(e) | |
random.shuffle(temp) | |
pending_targets = temp | |
except Exception as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment