Skip to content

Instantly share code, notes, and snippets.

@Dapacruz
Created July 25, 2022 22:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Dapacruz/0e70010d3a3ae96f363650ba55341ba3 to your computer and use it in GitHub Desktop.
Save Dapacruz/0e70010d3a3ae96f363650ba55341ba3 to your computer and use it in GitHub Desktop.
Validates recently created/modified Palo Alto Networks security policies
#!/usr/bin/env python3
'''Validates recently created/modified security policies
validate-security-policies.py
Author: David Cruz (davidcruz72@gmail.com)
Python version >= 3.9
Required Python packages:
None
Features:
Checks for duplicate address objects
'''
import argparse
import datetime as DT
import json
import os
import re
import signal
import sys
import time
import urllib.request
import xml.etree.ElementTree as ET
def sigint_handler(signum, frame):
sys.exit(1)
def query_config_log(panorama, api_key, days):
week_ago = (DT.datetime.now() - DT.timedelta(days=days)).strftime("%Y/%m/%d %H:%M:%S")
# Get configuration log past week
params = urllib.parse.urlencode({
'key': api_key,
'type': 'log',
'log-type': 'config',
'dir': 'forward',
'nlogs': 5000,
'query': f'(receive_time geq "{week_ago}") and (( cmd eq edit ) or ( cmd eq set )) and ( full-path contains "security/rules" ) and ( result eq Succeeded )',
})
url = f'https://{panorama}/api/?{params}'
try:
with urllib.request.urlopen(url) as response:
xml = response.read().decode('utf-8')
except OSError as err:
sys.stderr.write(f'{panorama}: Unable to connect to host ({err})\n')
sys.exit(1)
try:
root = ET.fromstring(xml)
except TypeError as err:
sys.stderr.write(f'Unable to parse XML! ({err})\n')
sys.exit(1)
return root.find('result/job').text
def get_results(panorama, api_key, job_id):
# Get log query results
params = urllib.parse.urlencode({
'key': api_key,
'type': 'log',
'action': 'get',
'job-id': job_id,
})
url = f'https://{panorama}/api/?{params}'
xml = ''
job_finished = False
while(not job_finished):
try:
with urllib.request.urlopen(url) as response:
xml = response.read().decode('utf-8')
except OSError as err:
sys.stderr.write(f'{panorama}: Unable to connect to host ({err})\n')
sys.exit(1)
try:
root = ET.fromstring(xml)
except TypeError as err:
sys.stderr.write(f'Unable to parse XML! ({err})\n')
sys.exit(1)
job_status = root.find('result/job/status').text
if job_status == 'FIN':
job_finished = True
else:
time.sleep(2)
return xml
def create_pan_config_filter(policy_paths):
security_policies = []
regex = re.compile(r".*(device-group/entry\[@name='(?P<device_group>[^']+)|shared).*rules/entry\[@name='(?P<policy>[^']+).*")
for path in policy_paths:
m = re.match(regex, path)
device_group = m.group('device_group') or 'Shared'
policy = m.group('policy')
security_policies.append(f'((location is {device_group}) and (name eq {policy}))')
return f"'filter={' or '.join(security_policies)}'"
def get_config(panorama, api_key):
params = urllib.parse.urlencode({
'key': api_key,
'type': 'op',
'cmd': f'<show><config><running></running></config></show>'
})
url = f'https://{panorama}/api/?{params}'
try:
with urllib.request.urlopen(url) as response:
xml = response.read().decode('utf-8')
except OSError as err:
sys.stderr.write(f'{panorama}: Unable to connect to host ({err})\n')
sys.exit(1)
return xml
def get_modified_sec_pols(policy_paths, config):
regex = re.compile(r".*(device-group/entry\[@name='(?P<device_group>[^']+)|shared).*")
security_policies = ['<security-policies>']
for path in policy_paths:
# Get security policies
policy = config.find(path)
# Filter out deleted secuirty policies
if policy:
m = re.match(regex, path)
device_group = m.group('device_group') or 'Shared'
# Insert device group
ET.SubElement(policy, 'device-group').text = device_group
security_policies.append(ET.tostring(policy, encoding='unicode'))
security_policies.append('</security-policies>')
return '\n'.join(security_policies)
def parse_paths(config_log):
try:
root = ET.fromstring(config_log)
except TypeError as err:
sys.stderr.write(f'Unable to parse XML! ({err})\n')
sys.exit(1)
# Get unique security policies
policy_paths = { f"./{e.text.rstrip('/disabled')}" for e in root.findall('.//full-path') }
return policy_paths
def get_addresses(address_objects, config):
addresses = []
for addr in address_objects:
if not get_addr_grp_members(addr, config):
addresses.append(addr)
return addresses
def get_address_groups(address_objects, config):
address_groups = {}
for addr in address_objects:
if members := get_addr_grp_members(addr, config):
address_groups[addr] = members
address_groups.update(get_address_groups(members, config))
return address_groups
def get_addr_grp_members(addr, config):
if xml := config.find(f".//shared/address-group/entry[@name='{addr}']"):
return [ e.text for e in (xml.findall('./static/member') or xml.findall('./dynamic/member')) ]
else:
return None
def import_saved_settings(settings_path):
with open(settings_path, 'r') as f:
settings = json.load(f)
return settings
def main():
# Ctrl+C graceful exit
signal.signal(signal.SIGINT, sigint_handler)
# Parse command arguments
parser = argparse.ArgumentParser(description='Validates recently created/modified security policies')
parser.add_argument('-k', '--key', metavar='', type=str, help='API key')
# TODO: Reset default to 7 days
parser.add_argument('-d', '--days', metavar='', type=int, default=1, help='Analyze security policies created/modified last n days (default is 7)')
parser.add_argument('-p', '--pan-configurator', action='store_true', help='Include PAN-Configurator command')
args = parser.parse_args()
# Import settings
settings_path = 'settings.json'
if os.path.exists(settings_path):
settings = import_saved_settings(settings_path)
try:
panorama = settings['panorama']
except (UnboundLocalError, KeyError):
panorama = input('Panorama Hostname: ')
if not args.key:
try:
args.key = settings['key']
except (UnboundLocalError, KeyError):
args.key = input('API Key: ')
# TEST: Import running config from file
# config = ET.parse('config_running.xml')
config = ET.fromstring(get_config(panorama, args.key))
job_id = query_config_log(panorama, args.key, args.days)
config_log = get_results(panorama, args.key, job_id)
policy_paths = parse_paths(config_log)
# Output PAN-Configurator command
if args.pan_configurator:
pan_config_filter = create_pan_config_filter(policy_paths)
print(f"\npa_rule-edit in=api://{panorama} location=all 'actions=exportToExcel:security_policies-{DT.datetime.now().strftime('%Y.%m.%d-%H%M%S')}.xls,ResolveAddressSummary|ResolveServiceSummary' {pan_config_filter}\n")
print()
# Analyze recently created/modified security policies
security_policies = ET.fromstring(get_modified_sec_pols(policy_paths, config))
count = 0
t1_start = time.process_time()
for policy in security_policies.findall('./entry'):
count += 1
print(f'Analyzing security policy "{policy.get("name")}" in device group "{policy.find("device-group").text}"')
# Check for duplicate address objects
for i in ['source', 'destination']:
address_objects = [ e.text for e in policy.findall(f'.//{i}/member') ]
# Enumerate address objects
addresses = get_addresses(address_objects, config)
# Filter out known addresses
address_objects = [ addr for addr in address_objects if addr not in addresses ]
# Recursively enumerate address groups
address_groups = get_address_groups(address_objects, config)
# Check if addresses are in sibling address groups
for addr in addresses:
for group, members in address_groups.items():
if addr in members:
print(f'\t{i.title()} address "{addr}" found in address group "{group}"')
print()
t1_stop = time.process_time()
print(f'\nAnalyzed {count} security policies in {t1_stop-t1_start :.3f} seconds')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment