Skip to content

Instantly share code, notes, and snippets.

@tkalus
Created June 3, 2020 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkalus/9bf1cae852012083314aebfe60f88c42 to your computer and use it in GitHub Desktop.
Save tkalus/9bf1cae852012083314aebfe60f88c42 to your computer and use it in GitHub Desktop.
AWS: Attempt to delete all default VPCs in all active regions, provided they appear unused and unmodified.
#!/usr/bin/env python3
"""AWS: Safely delete all default VPCs in all active regions."""
import logging
import sys
from functools import partial
from itertools import chain
from typing import Callable
from boto3.session import Session
from botocore.exceptions import ClientError
log = logging.getLogger(__name__)
def get_inactive_ec2_regions(session: Session):
"""Yield inactive regions (for logging)."""
active_regions = tuple(get_active_ec2_regions(session))
for region in (
session.client("ec2").describe_regions(AllRegions=True).get("Regions", [])
):
region_name = region.get("RegionName")
if region_name not in active_regions:
yield region_name
def get_active_ec2_regions(session: Session):
"""Yield active regions."""
for region in (
session.client("ec2")
.describe_regions(
Filters=[
{"Name": "opt-in-status", "Values": ["opt-in-not-required", "opted-in"]}
],
AllRegions=True,
)
.get("Regions", [])
):
yield region.get("RegionName")
def get_default_vpcs(session: Session, region_name: str):
"""List VPCs tagged isDefault = true."""
for vpc in (
session.client("ec2", region_name=region_name)
.describe_vpcs(Filters=[{"Name": "isDefault", "Values": ["true"]}])
.get("Vpcs")
):
yield vpc.get("VpcId")
def vpc_only_has_defaults(session: Session, region_name: str, vpc_id: str): # noqa:C901
"""
Iterate across the vpc_id, checking to ensure it appears unused and unmodified.
:param session: boto3.session.Session for AWS.
:param region_name: AWS Region to operate in.
:param vpc_id: VPC ID thought to be a default VPC.
:returns bool: True if appears to be default; False if not.
:raises: possible botocore.exceptions, uncaught.
"""
vpc = session.resource("ec2", region_name=region_name).Vpc(vpc_id)
is_default = True
for assoc in chain(
vpc.cidr_block_association_set or [], vpc.ipv6_cidr_block_association_set or []
):
cidr_block = assoc.get("CidrBlock") or assoc.get("Ipv6CidrBlock")
if cidr_block == vpc.cidr_block:
# Safely skip the default cidr_block association
continue
state = assoc.get("CidrBlockState", {}).get("State") or assoc.get(
"Ipv6CidrBlockState", {}
).get("State")
if state != "associated":
# old and failed associated blocks are listed in the API
# Skip all but "associated"
continue
# Should be no additonal IPv4 CIDR Assoc's
log.error(f"CIDR Association {cidr_block}:{state} found in {vpc_id}")
is_default = False
for peering_connection in chain(
vpc.accepted_vpc_peering_connections.all() or [],
vpc.requested_vpc_peering_connections.all() or [],
):
# Should be no Peering Connections
log.error(f"Peering Connection {peering_connection.id} found in {vpc_id}")
is_default = False
for net_iface in vpc.network_interfaces.all():
# Should be no Network Interfaces
log.error(f"Network Interface {net_iface.id} found in {vpc_id}")
is_default = False
for instance in vpc.instances.all():
# Should be no Instances
log.error(f"Instance {instance.id} found in {vpc_id}")
is_default = False
for igw in vpc.internet_gateways.all():
for vid in [a.get("VpcId") for a in igw.attachments]:
if vid != vpc_id:
# All IGW Assoications should be exclusive to the default
log.error(
f"Non default IGW Attachment ({vid}) found on {igw.id} in {vpc_id}"
)
is_default = False
for subnet in vpc.subnets.all():
if not subnet.default_for_az:
# Ensure that no subnets were added after-the-fact
log.error(f"Non default Subnet ({subnet.id}) found in {vpc_id}")
is_default = False
for rtb in vpc.route_tables.all():
if not all(attrib.get("Main") for attrib in rtb.associations_attribute):
# Ensure rtb assoications are _only_ main
log.error(f"RTB ({rtb.id}) with a non-main assoc found in {vpc_id}")
is_default = False
for route in rtb.routes:
if route.gateway_id == "local":
continue
if route.destination_cidr_block != "0.0.0.0/0":
log.error(
f"Found non-default Route {route.destination_cidr_block} in RTB ({rtb.id}) in {vpc_id}"
)
is_default = False
for nacl in vpc.network_acls.all():
if not nacl.is_default:
log.error(f"Non default NACL ({nacl.id}) found in {vpc_id}")
is_default = False
if len([vpc.security_groups.all()]) > 1:
log.error(f"Non default SG found in {vpc_id}")
is_default = False
for sg in vpc.security_groups.all():
if sg.group_name != "default":
log.error(f"Non default SG ({sg.id}) found in {vpc_id}")
is_default = False
return is_default
def try_except_dry_run(func: Callable[[None], None]) -> None:
"""Call passed function, catching and logging if we ONLY hit a DryRun "problem"."""
try:
func()
except ClientError as e:
if e.response["Error"]["Code"] != "DryRunOperation":
raise
log.debug(" DRYRUN: Skipped ^^^")
def delete_default_vpc_if_unmodified(
session: Session, region_name: str, vpc_id: str, dry_run: bool = True
) -> None:
"""
Delete the default VPC in region_name iff it appears to be unused and unmodified.
:param session: boto3.session.Session for AWS.
:param region_name: AWS Region to operate in.
:param vpc_id: VPC ID thought to be a default VPC.
:param dry_run: Whether or not to attempt to delete resources OR log whether you could.
:raises: possible botocore.exceptions, uncaught.
"""
if not vpc_only_has_defaults(
session=session, region_name=region_name, vpc_id=vpc_id
):
if dry_run:
log.error(f"VPC {vpc_id} appears to have been modified.")
else:
raise TypeError(f"VPC {vpc_id} appears to have been modified.")
return
ec2 = session.resource("ec2", region_name=region_name)
vpc = ec2.Vpc(vpc_id)
for igw in vpc.internet_gateways.all():
log.info(f"Detaching igw {igw.id} from VPC {vpc_id}")
try_except_dry_run(partial(igw.detach_from_vpc, DryRun=dry_run, VpcId=vpc_id))
log.info(f"Deleting igw {igw.id} from VPC {vpc_id}")
try_except_dry_run(partial(igw.delete, DryRun=dry_run))
for subnet in vpc.subnets.all():
log.info(f"Deleting subnet {subnet.id} from VPC {vpc_id}")
try_except_dry_run(partial(subnet.delete, DryRun=dry_run))
for rtb in vpc.route_tables.all():
for route in rtb.routes:
if route.gateway_id == "local":
# Can't delete local routes
continue
log.info(
f"Deleting route {route.destination_cidr_block} via"
f" {route.gateway_id} on rtb {rtb.id} in VPC {vpc_id}"
)
try_except_dry_run(partial(route.delete, DryRun=dry_run))
log.info(f"Route Table {rtb.id} will be deleted with VPC {vpc_id}")
for nacl in vpc.network_acls.all():
log.info(f"Default NACL {nacl.id} will be deleted wtih VPC {vpc_id}")
for sg in vpc.security_groups.all():
log.info(f"Default SG {sg.id} will be deleted with VPC {vpc_id}")
log.info(f"Deleting VPC {vpc_id}")
try_except_dry_run(partial(vpc.delete, DryRun=dry_run))
def main(dry_run: bool = False):
"""Collect Active Regions and Default VPCs and see about deleting them."""
session = Session()
for region_name in sorted(get_inactive_ec2_regions(session)):
log.info(f"Inactive REGION: {region_name}")
for region_name in sorted(get_active_ec2_regions(session)):
log.info(f"Working on REGION: {region_name}")
for vpc_id in get_default_vpcs(session=session, region_name=region_name):
log.info(f" VPC Id: {vpc_id}")
delete_default_vpc_if_unmodified(
session=session, region_name=region_name, vpc_id=vpc_id, dry_run=dry_run
)
if __name__ == "__main__":
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.ERROR)
logging.getLogger("boto3").setLevel(logging.ERROR)
logging.getLogger("urllib3").setLevel(logging.ERROR)
main(dry_run=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment