Skip to content

Instantly share code, notes, and snippets.

@altairlage
Last active May 2, 2019 01:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save altairlage/a86cf44ae9703ae1a7a7707b16448485 to your computer and use it in GitHub Desktop.
Save altairlage/a86cf44ae9703ae1a7a7707b16448485 to your computer and use it in GitHub Desktop.
Delete AWS IoT things based on type or thing name detaching principals and deleting certificates (python 3 iot)
import logging
import argparse
import os
import botocore
import json
import boto3
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)
# ============================================================================================
parser = argparse.ArgumentParser()
parser.add_argument(
'--region', '-r',
type=str,
required=True,
help='AWS region'
)
parser.add_argument(
'--thingtype', '-t',
type=str,
required=False,
help='ThingTypeName to be used to list things'
)
parser.add_argument(
'--thingname', '-n',
type=str,
required=False,
help='ThingName to be used to delete thing'
)
parser.add_argument(
'--profile', '-p',
type=str,
required=False,
help='AWS config profile name to connect'
)
def deleteThingsByType(thing_type_name):
paginator = iot_client.get_paginator('list_things')
page_iterator = paginator.paginate(
thingTypeName=thing_type_name,
PaginationConfig={
'PageSize': 50
}
)
for page in page_iterator:
for thing in page['things']:
deleteThing(thing['thingName'], thing['version'])
logging.info("All things deleted!")
def listThingsByType(thing_type_name):
logging.info("Listing things in AWS Environment: " + aws_env)
logging.info("Listing things in AWS Region: " + aws_region)
logging.info("Listing things of the type: " + thing_type_name)
paginator = iot_client.get_paginator('list_things')
page_iterator = paginator.paginate(
thingTypeName=thing_type_name,
PaginationConfig={
'PageSize': 50
}
)
number_of_things = 0
for page in page_iterator:
for thing in page['things']:
logging.info("- " + thing['thingName'] + " - version: " + str(thing['version']) + " - thingTypeName: " + thing['thingTypeName'])
number_of_things += 1
logging.info("\n\n\n- Number of things to delete: " + str(number_of_things))
return number_of_things
def cert_arn_to_id(arn):
position = arn.find(':cert/')
return arn[position + 6:]
def detach_inactivate_delete_certs(cert_arns):
for arn in cert_arns:
# Deactivates the cert
logging.info("- Deactivating cert: " + cert_arn_to_id(arn))
response = iot_client.update_certificate(
certificateId=cert_arn_to_id(arn),
newStatus='INACTIVE'
)
# Lists cert policies and detaches them
principals = iot_client.list_principal_policies(
principal = arn
)
for policy in principals['policies']:
logging.info("Detaching policy " + policy['policyName'] + " ARN: " + policy['policyArn'])
iot_client.detach_principal_policy(
policyName=policy['policyName'],
principal=arn
)
# Deletes the cert
logging.info("- Deleting cert: " + cert_arn_to_id(arn))
iot_client.delete_certificate(
certificateId=cert_arn_to_id(arn),
forceDelete=True
)
def deleteThing(thing_name, thing_version):
# Get the list of principals
principals_response = iot_client.list_thing_principals(thingName=thing_name)
principals_list = principals_response['principals']
cert_arns = []
if principals_list:
if len(principals_list) > 0:
# Get the list of certificates
for principal in principals_list:
if ":cert/" in principal:
cert_arns.append(principal)
logging.info("Thing " + thing_name + " has principals attached: " + json.dumps(principals_list, sort_keys=True, indent=4))
logging.info("Dettaching principals!")
try:
for principal in principals_list:
iot_client.detach_thing_principal(
thingName=thing_name,
principal=principal
)
except Exception as e:
logging.error("Error while dettaching principals from thing " + thing_name)
raise e
logging.info("Deleting thing with name " + thing_name + " and version " + str(thing_version))
try:
iot_client.delete_thing(thingName = thing_name, expectedVersion = thing_version)
logging.info("Thing with name " + thing_name + " deleted!")
logging.info("Inactivating, Detaching and Deleting the certificates related to " + thing_name)
detach_inactivate_delete_certs(cert_arns)
except Exception as e:
logging.error("Error while deleting thing " + thing_name)
raise e
def describeThing(thing_name):
try:
describe = iot_client.describe_thing(thingName=thing_name)
if not describe['thingName']:
return None
else:
logging.info("Thing details: " + json.dumps(describe, sort_keys=True, indent=4))
return describe
except Exception as e:
logging.error("No things found with the given name " + thing_name)
raise e
args = parser.parse_args()
# Setup Vars
aws_region = args.region.lower()
thing_type_name = args.thingtype
thing_name = args.thingname
aws_profile = args.profile
auth = boto3.Session(profile_name=aws_profile)
iot_client = auth.client(
"iot",
region_name=aws_region
)
logging.info("---------------------------------------------------------------------------------------------")
if thing_type_name or thing_name:
if thing_type_name:
count = listThingsByType(thing_type_name)
if count > 0:
deleteThingsByType(thing_type_name)
else:
raise Exception("No Iot thing found for the given type: " + thing_type_name)
else:
describe = describeThing(thing_name)
if describe:
deleteThing(describe['thingName'], describe['version'])
else:
raise Exception("No Iot thing found for the given name: " + thing_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment