Created
July 25, 2022 22:34
-
-
Save Dapacruz/0e70010d3a3ae96f363650ba55341ba3 to your computer and use it in GitHub Desktop.
Validates recently created/modified Palo Alto Networks security policies
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
'''Validates recently created/modified security policies | |
validate-security-policies.py | |
Author: David Cruz (davidcruz72@gmail.com) | |
Python version >= 3.9 | |
Required Python packages: | |
None | |
Features: | |
Checks for duplicate address objects | |
''' | |
import argparse | |
import datetime as DT | |
import json | |
import os | |
import re | |
import signal | |
import sys | |
import time | |
import urllib.request | |
import xml.etree.ElementTree as ET | |
def sigint_handler(signum, frame): | |
sys.exit(1) | |
def query_config_log(panorama, api_key, days): | |
week_ago = (DT.datetime.now() - DT.timedelta(days=days)).strftime("%Y/%m/%d %H:%M:%S") | |
# Get configuration log past week | |
params = urllib.parse.urlencode({ | |
'key': api_key, | |
'type': 'log', | |
'log-type': 'config', | |
'dir': 'forward', | |
'nlogs': 5000, | |
'query': f'(receive_time geq "{week_ago}") and (( cmd eq edit ) or ( cmd eq set )) and ( full-path contains "security/rules" ) and ( result eq Succeeded )', | |
}) | |
url = f'https://{panorama}/api/?{params}' | |
try: | |
with urllib.request.urlopen(url) as response: | |
xml = response.read().decode('utf-8') | |
except OSError as err: | |
sys.stderr.write(f'{panorama}: Unable to connect to host ({err})\n') | |
sys.exit(1) | |
try: | |
root = ET.fromstring(xml) | |
except TypeError as err: | |
sys.stderr.write(f'Unable to parse XML! ({err})\n') | |
sys.exit(1) | |
return root.find('result/job').text | |
def get_results(panorama, api_key, job_id): | |
# Get log query results | |
params = urllib.parse.urlencode({ | |
'key': api_key, | |
'type': 'log', | |
'action': 'get', | |
'job-id': job_id, | |
}) | |
url = f'https://{panorama}/api/?{params}' | |
xml = '' | |
job_finished = False | |
while(not job_finished): | |
try: | |
with urllib.request.urlopen(url) as response: | |
xml = response.read().decode('utf-8') | |
except OSError as err: | |
sys.stderr.write(f'{panorama}: Unable to connect to host ({err})\n') | |
sys.exit(1) | |
try: | |
root = ET.fromstring(xml) | |
except TypeError as err: | |
sys.stderr.write(f'Unable to parse XML! ({err})\n') | |
sys.exit(1) | |
job_status = root.find('result/job/status').text | |
if job_status == 'FIN': | |
job_finished = True | |
else: | |
time.sleep(2) | |
return xml | |
def create_pan_config_filter(policy_paths): | |
security_policies = [] | |
regex = re.compile(r".*(device-group/entry\[@name='(?P<device_group>[^']+)|shared).*rules/entry\[@name='(?P<policy>[^']+).*") | |
for path in policy_paths: | |
m = re.match(regex, path) | |
device_group = m.group('device_group') or 'Shared' | |
policy = m.group('policy') | |
security_policies.append(f'((location is {device_group}) and (name eq {policy}))') | |
return f"'filter={' or '.join(security_policies)}'" | |
def get_config(panorama, api_key): | |
params = urllib.parse.urlencode({ | |
'key': api_key, | |
'type': 'op', | |
'cmd': f'<show><config><running></running></config></show>' | |
}) | |
url = f'https://{panorama}/api/?{params}' | |
try: | |
with urllib.request.urlopen(url) as response: | |
xml = response.read().decode('utf-8') | |
except OSError as err: | |
sys.stderr.write(f'{panorama}: Unable to connect to host ({err})\n') | |
sys.exit(1) | |
return xml | |
def get_modified_sec_pols(policy_paths, config): | |
regex = re.compile(r".*(device-group/entry\[@name='(?P<device_group>[^']+)|shared).*") | |
security_policies = ['<security-policies>'] | |
for path in policy_paths: | |
# Get security policies | |
policy = config.find(path) | |
# Filter out deleted secuirty policies | |
if policy: | |
m = re.match(regex, path) | |
device_group = m.group('device_group') or 'Shared' | |
# Insert device group | |
ET.SubElement(policy, 'device-group').text = device_group | |
security_policies.append(ET.tostring(policy, encoding='unicode')) | |
security_policies.append('</security-policies>') | |
return '\n'.join(security_policies) | |
def parse_paths(config_log): | |
try: | |
root = ET.fromstring(config_log) | |
except TypeError as err: | |
sys.stderr.write(f'Unable to parse XML! ({err})\n') | |
sys.exit(1) | |
# Get unique security policies | |
policy_paths = { f"./{e.text.rstrip('/disabled')}" for e in root.findall('.//full-path') } | |
return policy_paths | |
def get_addresses(address_objects, config): | |
addresses = [] | |
for addr in address_objects: | |
if not get_addr_grp_members(addr, config): | |
addresses.append(addr) | |
return addresses | |
def get_address_groups(address_objects, config): | |
address_groups = {} | |
for addr in address_objects: | |
if members := get_addr_grp_members(addr, config): | |
address_groups[addr] = members | |
address_groups.update(get_address_groups(members, config)) | |
return address_groups | |
def get_addr_grp_members(addr, config): | |
if xml := config.find(f".//shared/address-group/entry[@name='{addr}']"): | |
return [ e.text for e in (xml.findall('./static/member') or xml.findall('./dynamic/member')) ] | |
else: | |
return None | |
def import_saved_settings(settings_path): | |
with open(settings_path, 'r') as f: | |
settings = json.load(f) | |
return settings | |
def main(): | |
# Ctrl+C graceful exit | |
signal.signal(signal.SIGINT, sigint_handler) | |
# Parse command arguments | |
parser = argparse.ArgumentParser(description='Validates recently created/modified security policies') | |
parser.add_argument('-k', '--key', metavar='', type=str, help='API key') | |
# TODO: Reset default to 7 days | |
parser.add_argument('-d', '--days', metavar='', type=int, default=1, help='Analyze security policies created/modified last n days (default is 7)') | |
parser.add_argument('-p', '--pan-configurator', action='store_true', help='Include PAN-Configurator command') | |
args = parser.parse_args() | |
# Import settings | |
settings_path = 'settings.json' | |
if os.path.exists(settings_path): | |
settings = import_saved_settings(settings_path) | |
try: | |
panorama = settings['panorama'] | |
except (UnboundLocalError, KeyError): | |
panorama = input('Panorama Hostname: ') | |
if not args.key: | |
try: | |
args.key = settings['key'] | |
except (UnboundLocalError, KeyError): | |
args.key = input('API Key: ') | |
# TEST: Import running config from file | |
# config = ET.parse('config_running.xml') | |
config = ET.fromstring(get_config(panorama, args.key)) | |
job_id = query_config_log(panorama, args.key, args.days) | |
config_log = get_results(panorama, args.key, job_id) | |
policy_paths = parse_paths(config_log) | |
# Output PAN-Configurator command | |
if args.pan_configurator: | |
pan_config_filter = create_pan_config_filter(policy_paths) | |
print(f"\npa_rule-edit in=api://{panorama} location=all 'actions=exportToExcel:security_policies-{DT.datetime.now().strftime('%Y.%m.%d-%H%M%S')}.xls,ResolveAddressSummary|ResolveServiceSummary' {pan_config_filter}\n") | |
print() | |
# Analyze recently created/modified security policies | |
security_policies = ET.fromstring(get_modified_sec_pols(policy_paths, config)) | |
count = 0 | |
t1_start = time.process_time() | |
for policy in security_policies.findall('./entry'): | |
count += 1 | |
print(f'Analyzing security policy "{policy.get("name")}" in device group "{policy.find("device-group").text}"') | |
# Check for duplicate address objects | |
for i in ['source', 'destination']: | |
address_objects = [ e.text for e in policy.findall(f'.//{i}/member') ] | |
# Enumerate address objects | |
addresses = get_addresses(address_objects, config) | |
# Filter out known addresses | |
address_objects = [ addr for addr in address_objects if addr not in addresses ] | |
# Recursively enumerate address groups | |
address_groups = get_address_groups(address_objects, config) | |
# Check if addresses are in sibling address groups | |
for addr in addresses: | |
for group, members in address_groups.items(): | |
if addr in members: | |
print(f'\t{i.title()} address "{addr}" found in address group "{group}"') | |
print() | |
t1_stop = time.process_time() | |
print(f'\nAnalyzed {count} security policies in {t1_stop-t1_start :.3f} seconds') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment