Last active
June 2, 2022 14:04
-
-
Save kaugm/ad030f3d52815c7fca164896f143f872 to your computer and use it in GitHub Desktop.
Simple script to compare a CloudFormation template against a master template and output any missing permissions. Intended use for checking Eco policy.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/opt/homebrew/bin/python3 | |
''' Help: Ensure policy.py is executable -> 'chmod u+x policy.py' | |
Help: Ensure path to python executable is correct on first line -> 'which python3' | |
''' | |
import re, sys, urllib.request, os | |
permissions_regex = '"([a-zA-z2-3]+:[a-zA-z2-3\*]+)"' | |
eco_policy_url = '' | |
eco_policy_url_old = '' | |
spot_policy_url = '' | |
class Policy: | |
def __init__(self, template): | |
self.template = template | |
self.name = str(template) | |
self.actions = self._extract_actions() | |
def _extract_actions(self): | |
'''Extract all AWS Policy actions from Cloudformation Template and add to object dictionary''' | |
allowed_actions = {} | |
# Template from Web | |
if 'http' in self.template: | |
with urllib.request.urlopen(self.template) as url: | |
response = url.read() | |
actions = re.findall(permissions_regex, str(response)) | |
for permission in re.findall(permissions_regex, str(response)): | |
resource,action = permission.split(':') | |
# Resource not added to dictionary | |
if resource not in allowed_actions: | |
allowed_actions[resource] = [action] | |
# Resource is already added to dictionary | |
else: | |
# If action for resource is already *, skip entry of other actions. Not necessary | |
if allowed_actions[resource][0] == '*': | |
continue | |
# If pending action addition is *, overwrite existing allowed actions | |
elif action == '*': | |
allowed_actions[resource] = [action] | |
# If pending action is something else, extend allowed actions per resource | |
else: | |
allowed_actions[resource] += [action] | |
# Template is local | |
else: | |
with open(self.template) as template: | |
lines = template.readlines() | |
for line in lines: | |
if len(re.findall(permissions_regex, line)) >= 1: | |
resource,action = re.findall(permissions_regex, line)[0].split(':') | |
# Resource not added to dictionary | |
if resource not in allowed_actions: | |
allowed_actions[resource] = [action] | |
# Resource is already added to dictionary | |
else: | |
# If action for resource is already *, skip entry of other actions. Not necessary | |
if allowed_actions[resource][0] == '*': | |
continue | |
# If pending action addition is *, overwrite existing allowed actions | |
elif action == '*': | |
allowed_actions[resource] = [action] | |
# If pending action is something else, extend allowed actions per resource | |
else: | |
allowed_actions[resource] += [action] | |
return allowed_actions | |
def __str__(self): | |
output = f'Allowed actions for {self.name}:\n' | |
for resource,actions in self.actions.items(): | |
for action in actions: | |
output += f'{resource} : {action}\n' | |
output += f'\n' | |
return output | |
def compare(eco_policy, customer_policy): | |
'''Compare customer Eco policy to latest Eco full permissions policy and output differences''' | |
missing_permissions = {} | |
for resource,actions in eco_policy.actions.items(): | |
# Resource not present in customer policy, add everything | |
if resource not in customer_policy.actions: | |
missing_permissions[resource] = actions | |
# Resource is present in customer policy | |
else: | |
missing_permissions[resource] = [] | |
for action in actions: | |
# If individual action not in customer policy, add it for that resource | |
if action not in customer_policy.actions[resource]: | |
missing_permissions[resource] += [action] | |
return missing_permissions | |
def output(permissions_dict): | |
summary = f'\nMissing permissions for Customer Eco Policy:\n\n' | |
for resource,actions in permissions_dict.items(): | |
# If missing values exist | |
if actions: | |
for action in actions: | |
summary += f'{resource}:{action}\n' | |
if len(summary) == 47: | |
summary += f'none\n' | |
print(summary) | |
return | |
else: | |
print(summary) | |
if __name__ == "__main__": | |
# Ensure proper argument count | |
if len(sys.argv[2:]) >= 3: | |
print(f'{len(sys.argv[1:])} Arguments entered: {sys.argv[1:]}:') | |
print(f'Error: Please only compare 1 template at a time.\nUsage: policy.py [ eco | spot ] <template>\n') | |
# Read in model policy template | |
try: | |
if sys.argv[2].lower() == 'eco': | |
correct_permissions = Policy(eco_policy_url) | |
elif sys.argv[2].lower() == 'spot': | |
correct_permissions = Policy(spot_policy_url) | |
else: | |
print(f'\nUnknown policy type: {sys.argv[2]}. Expecting "eco" or "spot". Exiting...\n') | |
os._exit(1) | |
except: | |
print(f'Error reading in {sys.argv[2].lower} policy from internet.') | |
# Testing - Uncomment the line below to ensure permissions are being pulled from remote source | |
# print(correct_permissions) | |
# Read in policy template to test | |
try: | |
customer_policy_test = Policy(sys.argv[3]) | |
output(compare(correct_permissions, customer_policy_test)) | |
except: | |
print('Error reading in test policy OR comparing policies.') | |
try: | |
os.remove(sys.argv[3]) | |
except FileNotFoundError: | |
print(f'Unable to delete {sys.argv[3]} because it cannot be found.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Shortcut for use in .zshrc or .bashrc
check() {
case $1 in
policy) ~/bin/policy.py $@ ;;
*) echo "Usage: check policy [ eco | spot ] file" ;;
esac
}