Created
March 21, 2024 20:17
-
-
Save bneutra/ecac0b4888da29aaac3acdc7536e6c72 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import sys | |
def has_ecs_create_permissions(policy_document): | |
""" | |
Check if the policy document contains permissions to create ECS resources. | |
""" | |
for statement in policy_document.get("Statement", []): | |
if statement.get("Action") is None: | |
continue | |
if statement["Effect"] == "Allow": | |
actions = ( | |
statement["Action"] | |
if isinstance(statement["Action"], list) | |
else [statement["Action"]] | |
) | |
for action in actions: | |
if any( | |
ecs_action in action | |
for ecs_action in [ | |
"ecs:Create", | |
"ecs:RunTask", | |
"ecs:StartTask", | |
"ecs:Register", | |
] | |
): | |
return [action] | |
return [] | |
def has_ecs_tag_resource_permission(policy_document): | |
""" | |
Check if the policy document contains permission to tag ECS resources. | |
""" | |
for statement in policy_document.get("Statement", []): | |
if statement.get("Action") is None: | |
continue | |
if statement["Effect"] == "Allow": | |
actions = ( | |
statement["Action"] | |
if isinstance(statement["Action"], list) | |
else [statement["Action"]] | |
) | |
if "ecs:TagResource" in actions: | |
return True | |
return False | |
def get_all_roles(region_name): | |
""" | |
Use a paginator to get all roles in the specified region. | |
""" | |
iam_client = boto3.client("iam", region_name=region_name) | |
paginator = iam_client.get_paginator("list_roles") | |
roles = [] | |
for page in paginator.paginate(): | |
roles.extend(page["Roles"]) | |
return roles | |
def scan_roles(region_name): | |
""" | |
Scan all IAM roles in the specified region and flag roles related to ECS creation without ecs:TagResource permission. | |
""" | |
roles = get_all_roles(region_name) | |
iam_client = boto3.client("iam", region_name=region_name) | |
flagged_roles = [] | |
for role in roles: | |
# print(f"Checking role {role['RoleName']}") | |
policy_documents = [] | |
attached_policies = iam_client.list_attached_role_policies( | |
RoleName=role["RoleName"] | |
) | |
for policy in attached_policies.get("AttachedPolicies", []): | |
policy_version = iam_client.get_policy_version( | |
PolicyArn=policy["PolicyArn"], | |
VersionId=iam_client.get_policy(PolicyArn=policy["PolicyArn"])[ | |
"Policy" | |
]["DefaultVersionId"], | |
) | |
policy_documents.append(policy_version["PolicyVersion"]["Document"]) | |
create_actions = [has_ecs_create_permissions(doc) for doc in policy_documents] | |
create_ecs_resources = any(create_actions) | |
has_tag_resource = any( | |
has_ecs_tag_resource_permission(doc) for doc in policy_documents | |
) | |
if create_ecs_resources and not has_tag_resource: | |
print(f"Role {role['RoleName']} flagged, actions {create_actions}") | |
flagged_roles.append(role["RoleName"]) | |
return flagged_roles | |
# Replace 'your-region-here' with the actual AWS region you're interested in. | |
region = sys.argv[1] | |
flagged_roles = scan_roles(region) | |
print("Flagged Roles:", flagged_roles) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment