Skip to content

Instantly share code, notes, and snippets.

@eamonnfaherty
Created January 20, 2021 15:07
Show Gist options
  • Save eamonnfaherty/7ac332daf24a0d6d7fc313c8ba72045c to your computer and use it in GitHub Desktop.
Save eamonnfaherty/7ac332daf24a0d6d7fc313c8ba72045c to your computer and use it in GitHub Desktop.
terminates products, deletes products and portfolios across your org
import boto3
from multiprocessing import Pool
import traceback
# config
account_access_role_arn = "arn:aws:iam::{}:role/OrganizationAccountAccessRole"
regions_to_clean = [
"eu-west-1",
"eu-west-2",
"eu-west-3",
"us-east-1",
"us-west-2",
]
accounts_to_ignore = [
"1234567890",
]
puppet_account_id = [
"0987654321",
]
def write_error(when, account_id, region):
tb = traceback.format_exc()
with open(f"errors/error_{when}_{account_id}_{region}.log", "w") as f:
f.write(
f"""Error: {when} for {region} of {account_id}
Traceback:
{tb}
"""
)
def clean(account_id, region_to_clean, current_count, n_accounts):
sts_client = boto3.client("sts")
spoke_role = account_access_role_arn.format(account_id)
print(f"[{current_count}/{n_accounts}] {account_id}: Cleaning using: {spoke_role}")
credentials = sts_client.assume_role(
RoleArn=spoke_role, RoleSessionName=f"spoke-sts-{account_id}"
).get("Credentials")
credentials = dict(
aws_access_key_id=credentials.get("AccessKeyId"),
aws_secret_access_key=credentials.get("SecretAccessKey"),
aws_session_token=credentials.get("SessionToken"),
)
# for region_to_clean in regions_to_clean:
if True:
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: cleaning"
)
servicecatalog_client = boto3.Session(**credentials).client(
"servicecatalog", region_name=region_to_clean
)
try:
detach_and_remove_all_portfolios(
account_id,
current_count,
n_accounts,
region_to_clean,
servicecatalog_client,
)
except Exception:
write_error("detach_and_remove_all_portfolios", account_id, region_to_clean)
try:
remove_products_from_account(
account_id,
current_count,
n_accounts,
region_to_clean,
servicecatalog_client,
)
except Exception:
write_error("remove_products_from_account", account_id, region_to_clean)
try:
remove_provisioned_products(
account_id,
current_count,
n_accounts,
region_to_clean,
servicecatalog_client,
)
except Exception:
write_error("remove_provisioned_products", account_id, region_to_clean)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished region"
)
print(f"[{current_count}/{n_accounts}] {account_id}: Finished account")
def detach_and_remove_all_portfolios(
account_id, current_count, n_accounts, region_to_clean, servicecatalog_client
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Cleaning portfolios"
)
portfolio_paginator = servicecatalog_client.get_paginator("list_portfolios")
portfolio_iterator = portfolio_paginator.paginate()
for portfolio_page in portfolio_iterator:
for portfolio in portfolio_page.get("PortfolioDetails", []):
portfolio_id = portfolio.get("Id")
portfolio_name = portfolio.get("DisplayName")
try:
detach_and_remove_portfolio(
account_id,
current_count,
n_accounts,
portfolio_id,
portfolio_name,
region_to_clean,
servicecatalog_client,
)
except Exception:
write_error("detach_and_remove_portfolio", account_id, region_to_clean)
if account_id in puppet_account_id:
response = servicecatalog_client.describe_portfolio_shares(
PortfolioId=portfolio_id,
Type='ACCOUNT',
)
accounts = response.get("PortfolioShareDetails", [])
page_token = response.get("NextPageToken")
while page_token:
response = servicecatalog_client.describe_portfolio_shares(
PortfolioId=portfolio_id,
Type='ACCOUNT',
PageToken=page_token,
)
accounts += response.get("PortfolioShareDetails", [])
page_token = response.get("NextPageToken")
for account in accounts:
servicecatalog_client.delete_portfolio_share(
PortfolioId=portfolio_id,
# AccountId=account.get("PrincipalId"),
OrganizationNode={
'Type': account.get("Type"),
'Value': account.get("PrincipalId"),
}
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Removing IMPORTED shared portfolios"
)
list_accepted_portfolio_shares_paginator = servicecatalog_client.get_paginator(
"list_accepted_portfolio_shares"
)
list_accepted_portfolio_shares_iterator = (
list_accepted_portfolio_shares_paginator.paginate(PortfolioShareType="IMPORTED")
)
for list_accepted_portfolio_shares_page in list_accepted_portfolio_shares_iterator:
for portfolio in list_accepted_portfolio_shares_page.get(
"PortfolioDetails", []
):
portfolio_id = portfolio.get("Id")
remove_principals(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
)
servicecatalog_client.reject_portfolio_share(
PortfolioId=portfolio_id,
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished removing shared portfolios for IMPORTED"
)
def detach_and_remove_portfolio(
account_id,
current_count,
n_accounts,
portfolio_id,
portfolio_name,
region_to_clean,
servicecatalog_client,
):
remove_principals(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
)
remove_constraints(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
)
remove_products_from_portfolio(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
)
delete_portfolio(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished cleaning: {portfolio_id} / {portfolio_name}"
)
def delete_portfolio(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Deleting portfolio"
)
servicecatalog_client.delete_portfolio(Id=portfolio_id)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished deleting portfolio"
)
def remove_principals(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Removing principals"
)
principal_paginator = servicecatalog_client.get_paginator(
"list_principals_for_portfolio"
)
principal_iterator = principal_paginator.paginate(PortfolioId=portfolio_id)
for principal_page in principal_iterator:
for principal in principal_page.get("Principals"):
servicecatalog_client.disassociate_principal_from_portfolio(
PortfolioId=portfolio_id,
PrincipalARN=principal.get("PrincipalARN"),
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished removing principals"
)
def remove_constraints(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Removing constraints"
)
constraints_paginator = servicecatalog_client.get_paginator(
"list_constraints_for_portfolio"
)
constraints_iterator = constraints_paginator.paginate(PortfolioId=portfolio_id)
for constraints_page in constraints_iterator:
for constraint in constraints_page.get("ConstraintDetails"):
servicecatalog_client.delete_constraint(
Id=constraint.get("ConstraintId"),
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished constraints"
)
def remove_products_from_portfolio(
account_id,
current_count,
n_accounts,
portfolio_id,
region_to_clean,
servicecatalog_client,
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Removing products from portfolio"
)
search_products_as_admin_paginator = servicecatalog_client.get_paginator(
"search_products_as_admin"
)
search_products_as_admin_iterator = search_products_as_admin_paginator.paginate(
PortfolioId=portfolio_id,
)
for search_products_as_admin_page in search_products_as_admin_iterator:
for product_view_details in search_products_as_admin_page.get(
"ProductViewDetails"
):
servicecatalog_client.disassociate_product_from_portfolio(
ProductId=product_view_details.get("ProductViewSummary").get(
"ProductId"
),
PortfolioId=portfolio_id,
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished removing products from portfolio"
)
def remove_products_from_account(
account_id, current_count, n_accounts, region_to_clean, servicecatalog_client
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Removing products from account"
)
search_products_as_admin_paginator = servicecatalog_client.get_paginator(
"search_products_as_admin"
)
search_products_as_admin_iterator = search_products_as_admin_paginator.paginate()
for search_products_as_admin_page in search_products_as_admin_iterator:
for product_view_details in search_products_as_admin_page.get(
"ProductViewDetails"
):
servicecatalog_client.delete_product(
Id=product_view_details.get("ProductViewSummary").get("ProductId"),
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished deleting products from account"
)
def remove_provisioned_products(
account_id, current_count, n_accounts, region_to_clean, servicecatalog_client
):
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Removing provisioned products from account"
)
scan_provisioned_products_paginator = servicecatalog_client.get_paginator(
"scan_provisioned_products"
)
scan_provisioned_products_iterator = scan_provisioned_products_paginator.paginate(
AccessLevelFilter={
'Key': 'Account',
"Value": "self"
}
)
for scan_provisioned_products_page in scan_provisioned_products_iterator:
for provisioned_product in scan_provisioned_products_page.get(
"ProvisionedProducts"
):
servicecatalog_client.terminate_provisioned_product(
ProvisionedProductId=provisioned_product.get("Id"),
)
print(
f"[{current_count}/{n_accounts}] {account_id} / {region_to_clean}: Finished deleting provisioned products from account"
)
N_THREADS = 100
if __name__ == "__main__":
org = dict()
print("Getting list of all accounts in the org.")
orgs_client = boto3.client("organizations")
paginator = orgs_client.get_paginator("list_accounts")
page_iterator = paginator.paginate()
for page in page_iterator:
for account in page.get("Accounts"):
org[account.get("Id")] = account
n_accounts = len(org.keys())
print(f"There are {n_accounts} accounts that need cleaning.")
print("Starting to delete from the org")
current_count = 0
with Pool(N_THREADS) as pool:
iter = list()
for account_id, account_details in org.items():
if account_id in accounts_to_ignore:
continue
for region in regions_to_clean:
iter.append([account_id, region, current_count, n_accounts * 4])
current_count += 1
pool.starmap(clean, iter)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment