|
from bs4 import BeautifulSoup |
|
import re |
|
import requests |
|
|
|
# Disable warnings to prevent annoying messages in the console |
|
requests.packages.urllib3.disable_warnings() |
|
|
|
ASA_USERNAME = 'APIUser' |
|
ASA_PASSWORD = 'xxxx' |
|
ASA_IP = "10.x.x.xx" |
|
ASA_PREFIX = "WVD_AU_East_" |
|
|
|
AZURE_REGION_PREFIX = "WindowsVirtualDesktop.AustraliaEast" |
|
|
|
IP_REGEX = "^((25[0-5]|(2[0-4]|1[0-9]|[1-9]|)[0-9])(\.(?!$)|$)){4}$" |
|
# So for whatever reason the ASA barfs at Python User agent, but works with Postman. Go figure. |
|
HTTP_HEADERS = {'User-Agent': 'PostmanRuntime/7.36.0'} |
|
|
|
|
|
def get_asa_network_objects(asa_prefix, asa_ip, asa_username, asa_password): |
|
asa_base_url = f"https://{asa_ip}/gadmin/exec/show+running-config+" # ASA Base URL |
|
asa_object_types = ["object", "object-group"] # URLs to query against the ASA3 |
|
return_object = [] # Object array to return what ever object types we want as per above asa_objects |
|
for asa_object in asa_object_types: |
|
# Need to do HTTP GET first to get the csrf_token token in the response to include it in the POST back. |
|
asa_get_csrf = requests.get(asa_base_url + asa_object, headers=HTTP_HEADERS, verify=False) |
|
if asa_get_csrf.status_code == 200: |
|
asa_bs = BeautifulSoup(asa_get_csrf.text, "html.parser") |
|
if asa_bs.find("input", attrs={"name": 'csrf_token'}): |
|
csrf_token = asa_bs.find("input", attrs={"name": 'csrf_token'})["value"] |
|
# Now we have a csrf token post back to the same URL with username, password and csrf_token |
|
data = {'username': asa_username, 'password': asa_password, 'csrf_token': csrf_token} |
|
post_data = requests.post(asa_base_url + asa_object, data=data, headers=HTTP_HEADERS, verify=False) |
|
print(len(post_data.text)) |
|
if len(post_data.text) > 0: |
|
has_entry = False # Flag to say we are currently parsing an entry |
|
return_array = [] # Return array entry |
|
for post_line in post_data.text.splitlines(): |
|
# Iterate through the entry as the first character in the line is always a space |
|
# print(post_line) |
|
if has_entry and post_line[0] == " ": |
|
# If we are an object then do a split after the subnet string and return everything |
|
if asa_object == "object": |
|
if "host" in post_line: |
|
return_array.append(post_line.split('host ')[1]) |
|
elif "subnet" in post_line: |
|
return_array.append(post_line.split('subnet ')[1]) |
|
# If we are an object-group then do a split after the " object " string and return everything |
|
if asa_object == "object-group": |
|
return_array.append(post_line.split(' object ')[1]) |
|
else: |
|
# If we has_entry but the line doesn't start with a space we are at the next entry |
|
has_entry = False |
|
# It's the first entry of the desired record based on prefix |
|
if post_line.startswith(asa_object+" network " + asa_prefix): |
|
has_entry = True |
|
return_object.append(return_array) # Append the whole array to the return_object |
|
return return_object |
|
|
|
|
|
def azure_wvd_ips(region_prefix): |
|
# Retrieve Azure Public URL to find JSON URL in the document |
|
azure_wvd_url = "https://www.microsoft.com/en-us/download/details.aspx?id=56519" |
|
azure_wvd_text = requests.get(azure_wvd_url, headers=HTTP_HEADERS).text |
|
azure_wvd_bs = BeautifulSoup(azure_wvd_text, "html.parser") |
|
azure_wvd_ip = [] # Empty list for IP addresses |
|
for tag in azure_wvd_bs.find_all(['a']): |
|
# The URL for the JSON URL in the document that needs to be downloaded |
|
if "download.microsoft.com/download/7/1/D/71D86715-5596-4529-9B13-DA13A5DE5B63" in str(tag): |
|
azure_wvd_json = requests.get(tag['href']).json() |
|
for value in azure_wvd_json['values']: |
|
# Find the WVDs in Australia, assuming they are correct |
|
if region_prefix in value['name']: |
|
azure_wvd_ip += value['properties']['addressPrefixes'] |
|
return azure_wvd_ip |
|
|
|
|
|
def diff_asa_azure(asa_prefix, asa, wvd): |
|
# Build CIDR map using loop - from: https://gist.github.com/vndmtrx/dc412e4d8481053ddef85c678f3323a6 |
|
subnet_map = {} |
|
for cidr in range(0, 33): |
|
subnet_map.update({str(cidr): ".".join( |
|
[str((((1 << 32) - 1) << (32 - cidr) >> cidr_number) & 255) for cidr_number in reversed(range(0, 32, 8))])}) |
|
|
|
# Create network objects with WVD prefix |
|
for wvd_ip in wvd: |
|
ip_split = wvd_ip.split("/") |
|
if re.match(IP_REGEX, str(ip_split[0])): |
|
object_name = f"{asa_prefix}{ip_split[0].replace('.', '_')}" |
|
object_string = f"{ip_split[0]} {subnet_map[ip_split[1]]}" |
|
# Checking if the address is already in the ASA |
|
if object_string in asa[0]: |
|
print(f"! Existing object entry {object_string}") |
|
asa[0].remove(object_string) |
|
else: |
|
print(f"! Add {object_name}") |
|
print(f'object network {object_name}') |
|
if ip_split[1] == "32": |
|
print(f' host {ip_split[0]}') |
|
else: |
|
print(f' subnet {object_string}') |
|
# else: ** TODO Support IPv6 |
|
# print(f'IPv6 {ip_split[0]}') |
|
|
|
# Manage object-group with SPLIT_TUNNEL suffix |
|
print(f'object-group network {asa_prefix}SPLIT_TUNNEL') |
|
for wvd_ip in wvd: |
|
ip_split = wvd_ip.split("/") |
|
object_name = f"{asa_prefix}{ip_split[0].replace('.', '_')}" |
|
if re.match(IP_REGEX, str(ip_split[0])): |
|
if object_name in asa[1]: |
|
print(f"! Existing object-group entry {object_name}") |
|
asa[1].remove(object_name) |
|
else: |
|
print(f' network-object object {object_name}') |
|
|
|
for remove_entries in asa[1]: # Remove the leftovers from the object-group |
|
print(f' no network-object object {remove_entries}') |
|
|
|
print("exit") # Finished in the network-object so exit |
|
|
|
for remove_entries in asa[0]: # Remove the leftovers from the objects themselves |
|
ip_split = remove_entries.split() |
|
print(f'no object network {asa_prefix}{ip_split[0].replace(".", "_")}') |
|
|
|
|
|
if __name__ == "__main__": |
|
current_asa = get_asa_network_objects(ASA_PREFIX, ASA_IP, ASA_USERNAME, ASA_PASSWORD) |
|
print(current_asa) |
|
|
|
wvd_ips = azure_wvd_ips(AZURE_REGION_PREFIX) |
|
# print(wvd_ips) |
|
|
|
diff_asa_azure(ASA_PREFIX, current_asa, wvd_ips) |