Last active
May 2, 2019 01:39
-
-
Save altairlage/a86cf44ae9703ae1a7a7707b16448485 to your computer and use it in GitHub Desktop.
Delete AWS IoT things based on type or thing name detaching principals and deleting certificates (python 3 iot)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import argparse | |
import os | |
import botocore | |
import json | |
import boto3 | |
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) | |
# ============================================================================================ | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
'--region', '-r', | |
type=str, | |
required=True, | |
help='AWS region' | |
) | |
parser.add_argument( | |
'--thingtype', '-t', | |
type=str, | |
required=False, | |
help='ThingTypeName to be used to list things' | |
) | |
parser.add_argument( | |
'--thingname', '-n', | |
type=str, | |
required=False, | |
help='ThingName to be used to delete thing' | |
) | |
parser.add_argument( | |
'--profile', '-p', | |
type=str, | |
required=False, | |
help='AWS config profile name to connect' | |
) | |
def deleteThingsByType(thing_type_name): | |
paginator = iot_client.get_paginator('list_things') | |
page_iterator = paginator.paginate( | |
thingTypeName=thing_type_name, | |
PaginationConfig={ | |
'PageSize': 50 | |
} | |
) | |
for page in page_iterator: | |
for thing in page['things']: | |
deleteThing(thing['thingName'], thing['version']) | |
logging.info("All things deleted!") | |
def listThingsByType(thing_type_name): | |
logging.info("Listing things in AWS Environment: " + aws_env) | |
logging.info("Listing things in AWS Region: " + aws_region) | |
logging.info("Listing things of the type: " + thing_type_name) | |
paginator = iot_client.get_paginator('list_things') | |
page_iterator = paginator.paginate( | |
thingTypeName=thing_type_name, | |
PaginationConfig={ | |
'PageSize': 50 | |
} | |
) | |
number_of_things = 0 | |
for page in page_iterator: | |
for thing in page['things']: | |
logging.info("- " + thing['thingName'] + " - version: " + str(thing['version']) + " - thingTypeName: " + thing['thingTypeName']) | |
number_of_things += 1 | |
logging.info("\n\n\n- Number of things to delete: " + str(number_of_things)) | |
return number_of_things | |
def cert_arn_to_id(arn): | |
position = arn.find(':cert/') | |
return arn[position + 6:] | |
def detach_inactivate_delete_certs(cert_arns): | |
for arn in cert_arns: | |
# Deactivates the cert | |
logging.info("- Deactivating cert: " + cert_arn_to_id(arn)) | |
response = iot_client.update_certificate( | |
certificateId=cert_arn_to_id(arn), | |
newStatus='INACTIVE' | |
) | |
# Lists cert policies and detaches them | |
principals = iot_client.list_principal_policies( | |
principal = arn | |
) | |
for policy in principals['policies']: | |
logging.info("Detaching policy " + policy['policyName'] + " ARN: " + policy['policyArn']) | |
iot_client.detach_principal_policy( | |
policyName=policy['policyName'], | |
principal=arn | |
) | |
# Deletes the cert | |
logging.info("- Deleting cert: " + cert_arn_to_id(arn)) | |
iot_client.delete_certificate( | |
certificateId=cert_arn_to_id(arn), | |
forceDelete=True | |
) | |
def deleteThing(thing_name, thing_version): | |
# Get the list of principals | |
principals_response = iot_client.list_thing_principals(thingName=thing_name) | |
principals_list = principals_response['principals'] | |
cert_arns = [] | |
if principals_list: | |
if len(principals_list) > 0: | |
# Get the list of certificates | |
for principal in principals_list: | |
if ":cert/" in principal: | |
cert_arns.append(principal) | |
logging.info("Thing " + thing_name + " has principals attached: " + json.dumps(principals_list, sort_keys=True, indent=4)) | |
logging.info("Dettaching principals!") | |
try: | |
for principal in principals_list: | |
iot_client.detach_thing_principal( | |
thingName=thing_name, | |
principal=principal | |
) | |
except Exception as e: | |
logging.error("Error while dettaching principals from thing " + thing_name) | |
raise e | |
logging.info("Deleting thing with name " + thing_name + " and version " + str(thing_version)) | |
try: | |
iot_client.delete_thing(thingName = thing_name, expectedVersion = thing_version) | |
logging.info("Thing with name " + thing_name + " deleted!") | |
logging.info("Inactivating, Detaching and Deleting the certificates related to " + thing_name) | |
detach_inactivate_delete_certs(cert_arns) | |
except Exception as e: | |
logging.error("Error while deleting thing " + thing_name) | |
raise e | |
def describeThing(thing_name): | |
try: | |
describe = iot_client.describe_thing(thingName=thing_name) | |
if not describe['thingName']: | |
return None | |
else: | |
logging.info("Thing details: " + json.dumps(describe, sort_keys=True, indent=4)) | |
return describe | |
except Exception as e: | |
logging.error("No things found with the given name " + thing_name) | |
raise e | |
args = parser.parse_args() | |
# Setup Vars | |
aws_region = args.region.lower() | |
thing_type_name = args.thingtype | |
thing_name = args.thingname | |
aws_profile = args.profile | |
auth = boto3.Session(profile_name=aws_profile) | |
iot_client = auth.client( | |
"iot", | |
region_name=aws_region | |
) | |
logging.info("---------------------------------------------------------------------------------------------") | |
if thing_type_name or thing_name: | |
if thing_type_name: | |
count = listThingsByType(thing_type_name) | |
if count > 0: | |
deleteThingsByType(thing_type_name) | |
else: | |
raise Exception("No Iot thing found for the given type: " + thing_type_name) | |
else: | |
describe = describeThing(thing_name) | |
if describe: | |
deleteThing(describe['thingName'], describe['version']) | |
else: | |
raise Exception("No Iot thing found for the given name: " + thing_name) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment