Skip to content

Instantly share code, notes, and snippets.

@kaugm
Last active June 2, 2022 14:04
Show Gist options
  • Save kaugm/ad030f3d52815c7fca164896f143f872 to your computer and use it in GitHub Desktop.
Save kaugm/ad030f3d52815c7fca164896f143f872 to your computer and use it in GitHub Desktop.
Simple script to compare a CloudFormation template against a master template and output any missing permissions. Intended use for checking Eco policy.
#!/opt/homebrew/bin/python3
''' Help: Ensure policy.py is executable -> 'chmod u+x policy.py'
Help: Ensure path to python executable is correct on first line -> 'which python3'
'''
import re, sys, urllib.request, os
permissions_regex = '"([a-zA-z2-3]+:[a-zA-z2-3\*]+)"'
eco_policy_url = ''
eco_policy_url_old = ''
spot_policy_url = ''
class Policy:
def __init__(self, template):
self.template = template
self.name = str(template)
self.actions = self._extract_actions()
def _extract_actions(self):
'''Extract all AWS Policy actions from Cloudformation Template and add to object dictionary'''
allowed_actions = {}
# Template from Web
if 'http' in self.template:
with urllib.request.urlopen(self.template) as url:
response = url.read()
actions = re.findall(permissions_regex, str(response))
for permission in re.findall(permissions_regex, str(response)):
resource,action = permission.split(':')
# Resource not added to dictionary
if resource not in allowed_actions:
allowed_actions[resource] = [action]
# Resource is already added to dictionary
else:
# If action for resource is already *, skip entry of other actions. Not necessary
if allowed_actions[resource][0] == '*':
continue
# If pending action addition is *, overwrite existing allowed actions
elif action == '*':
allowed_actions[resource] = [action]
# If pending action is something else, extend allowed actions per resource
else:
allowed_actions[resource] += [action]
# Template is local
else:
with open(self.template) as template:
lines = template.readlines()
for line in lines:
if len(re.findall(permissions_regex, line)) >= 1:
resource,action = re.findall(permissions_regex, line)[0].split(':')
# Resource not added to dictionary
if resource not in allowed_actions:
allowed_actions[resource] = [action]
# Resource is already added to dictionary
else:
# If action for resource is already *, skip entry of other actions. Not necessary
if allowed_actions[resource][0] == '*':
continue
# If pending action addition is *, overwrite existing allowed actions
elif action == '*':
allowed_actions[resource] = [action]
# If pending action is something else, extend allowed actions per resource
else:
allowed_actions[resource] += [action]
return allowed_actions
def __str__(self):
output = f'Allowed actions for {self.name}:\n'
for resource,actions in self.actions.items():
for action in actions:
output += f'{resource} : {action}\n'
output += f'\n'
return output
def compare(eco_policy, customer_policy):
'''Compare customer Eco policy to latest Eco full permissions policy and output differences'''
missing_permissions = {}
for resource,actions in eco_policy.actions.items():
# Resource not present in customer policy, add everything
if resource not in customer_policy.actions:
missing_permissions[resource] = actions
# Resource is present in customer policy
else:
missing_permissions[resource] = []
for action in actions:
# If individual action not in customer policy, add it for that resource
if action not in customer_policy.actions[resource]:
missing_permissions[resource] += [action]
return missing_permissions
def output(permissions_dict):
summary = f'\nMissing permissions for Customer Eco Policy:\n\n'
for resource,actions in permissions_dict.items():
# If missing values exist
if actions:
for action in actions:
summary += f'{resource}:{action}\n'
if len(summary) == 47:
summary += f'none\n'
print(summary)
return
else:
print(summary)
if __name__ == "__main__":
# Ensure proper argument count
if len(sys.argv[2:]) >= 3:
print(f'{len(sys.argv[1:])} Arguments entered: {sys.argv[1:]}:')
print(f'Error: Please only compare 1 template at a time.\nUsage: policy.py [ eco | spot ] <template>\n')
# Read in model policy template
try:
if sys.argv[2].lower() == 'eco':
correct_permissions = Policy(eco_policy_url)
elif sys.argv[2].lower() == 'spot':
correct_permissions = Policy(spot_policy_url)
else:
print(f'\nUnknown policy type: {sys.argv[2]}. Expecting "eco" or "spot". Exiting...\n')
os._exit(1)
except:
print(f'Error reading in {sys.argv[2].lower} policy from internet.')
# Testing - Uncomment the line below to ensure permissions are being pulled from remote source
# print(correct_permissions)
# Read in policy template to test
try:
customer_policy_test = Policy(sys.argv[3])
output(compare(correct_permissions, customer_policy_test))
except:
print('Error reading in test policy OR comparing policies.')
try:
os.remove(sys.argv[3])
except FileNotFoundError:
print(f'Unable to delete {sys.argv[3]} because it cannot be found.')
@kaugm
Copy link
Author

kaugm commented May 18, 2022

Shortcut for use in .zshrc or .bashrc

check() {
case $1 in
policy) ~/bin/policy.py $@ ;;
*) echo "Usage: check policy [ eco | spot ] file" ;;
esac
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment