Skip to content

Instantly share code, notes, and snippets.

@mcfantom-3xm
Created January 31, 2023 14:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mcfantom-3xm/2e0b817eb01808a2627beed252f32c26 to your computer and use it in GitHub Desktop.
Save mcfantom-3xm/2e0b817eb01808a2627beed252f32c26 to your computer and use it in GitHub Desktop.
Lambda function to analyze users access keys and disable according to certain parameter.
import boto3
from botocore.exceptions import ClientError
import datetime
import json
import time
import os
iam_client = boto3.client("iam")
ssm = boto3.client("ssm")
secretmanager = boto3.client("secretsmanager")
# This variable sets the max item count per page when using the list users paginator.
maxItemsPerPage = 50
# This variable sets the max quantity of access keys a user can have
maxNumberOfKeysxUser = 2
# This variable sets the AWS REGION for the boto3 api calls which needs it.
awsRegion = os.environ['AWS_REGION']
# This variable sets if printing log messages is enabled
loggingEnabled = os.environ['loggingEnabled']
# These are global variables used to send just one email to the user
mailMsg = ""
sendMail = False
def get_ssm_parameter(parametername: str, decrypt: bool):
# Looks for SSM parameters.
parametervalue = ""
try:
parameter = ssm.get_parameter(Name=parametername, WithDecryption=decrypt)
parametervalue = parameter["Parameter"]["Value"]
except ClientError as e:
print("ERROR: Error looking for the AWS SSM paramter. The error received was: " + e.response["Error"]["Code"] + " - " + e.response["Error"]["Message"])
finally:
return parametervalue
def get_tag_from_user(username: str, tagname: str):
# Look for a tag within the user's tag list
tagvalue = "NoTagFound"
try:
taglist = iam_client.list_user_tags(UserName=username)
for tag in taglist["Tags"]:
if tag["Key"] == tagname:
tagvalue = tag["Value"]
except ClientError as e:
print("ERROR: Error looking for user's tags.\nThe error received was: " + e.response["Error"]["Code"] + " - " + e.response["Error"]["Message"])
finally:
return tagvalue
def get_user_keyrotation(username):
# This function checks if auto key rotation is enabled for a user
# This is checked from a tag (AutoKeyRotation) that must be attached to the IAM user
userkeyrotation = get_tag_from_user(username=username, tagname="AutoKeyRotation")
if userkeyrotation != "NoTagFound":
if userkeyrotation in ["True", "true", "TRUE"]:
return True
else:
return False
def get_user_email(username):
# This function retrieves the email address from a tag that must be attached to the IAM user
useremail = get_tag_from_user(username=username, tagname="IAMUserMail")
if useremail != "NoTagFound":
return useremail
else:
return "NoEmailForUser"
def get_user_secret(username):
# This function retrieves the AWS Secret name from a tag that must be attached to the IAM user
usersecret = get_tag_from_user(username=username, tagname="SecretForKeys")
if usersecret != "NoTagFound":
return usersecret
else:
return "NoSecretForKeys"
def send_mail(username, msg):
email = get_user_email(username=username)
if email != "NoEmailForUser":
snstopicarn = str(get_ssm_parameter(parametername="SNSTopicARNForKeyRotationNotification", decrypt=False))
if snstopicarn != "":
sns_send_report = boto3.client("sns", region_name=awsRegion)
response = sns_send_report.publish(TopicArn=snstopicarn, Message=msg, Subject="Previous Key Deactivated")
if loggingEnabled == "True": print("LOG: Mail sent. The response was: " + str(response))
else:
print("ERROR: Could not send email notification.\nThe AWS SNS Topic for email notification was not found.\nPlease check the existence/value of AWS SSM parameter 'SNSTopicARNForKeyRotationNotification'")
else:
if loggingEnabled == "True": print("INFO: The user " + username + " does not have an email address associated.\nCheck if the IAM user has the IAMUserMail tag with a valid email address value.")
def disable_key(access_key, username):
global mailMsg
global sendMail
try:
iam_client.update_access_key(UserName=username, AccessKeyId=access_key, Status="Inactive")
if loggingEnabled == "True": print("INFO: The access key Id: '" + access_key + "' from the user '" + username + "' has been disabled.")
mailMsg = mailMsg + "The access key with Id: '" + access_key + "' from the user '" + username + "' has been disabled.\n\n"
sendMail = True
except ClientError as e:
print("ERROR: The access key with id " + access_key + " cannot be found.\nThe error received was: " + e.response["Error"]["Code"] + " - " + e.response["Error"]["Message"])
def delete_key(access_key, username):
global mailMsg
global sendMail
try:
iam_client.delete_access_key(UserName=username, AccessKeyId=access_key)
if loggingEnabled == "True": print("INFO: The access key ID: " + access_key + " from user '" + username + "' has been deleted.")
mailMsg = mailMsg + "The access key ID: " + access_key + " from user '" + username + "' has been deleted.\n\n"
sendMail = True
except ClientError as e:
print("ERROR: The access key with id " + access_key + " cannot be found.\nThe error received was: " + e.response["Error"]["Code"] + " - " + e.response["Error"]["Message"])
def create_key(username):
global mailMsg
global sendMail
try:
response = iam_client.create_access_key(UserName=username)
accesskey = response["AccessKey"]["AccessKeyId"]
secretkey = response["AccessKey"]["SecretAccessKey"]
if loggingEnabled == "True": print("The access key ID: " + accesskey + " for user '" + username + "' has been created.")
json_data = json.dumps({"AccessKey": accesskey, "SecretKey": secretkey})
usersecretname = get_user_secret(username=username)
secmgrresponse = secretmanager.put_secret_value(SecretId=usersecretname, SecretString=json_data)
if loggingEnabled == "True": print("The secret ID: " + usersecretname + " was written with a new value.")
mailMsg = mailMsg + "New access key with ID: '" + accesskey + "' has been created for user '" + username + "'.\n\n" + \
"Please, check your secret to obtain the secret access key.\n" + \
"https://" + awsRegion + ".console.aws.amazon.com/secretsmanager/home?region=" + awsRegion + "#!/secret?name=" + usersecretname + "\n\n" + \
"If you have any problems, please contact your AWS resources administrator."
sendMail = True
except ClientError as e:
print("ERROR: The new access key could not be created.\nThe error received was: " + e.response["Error"]["Code"] + " - " + e.response["Error"]["Message"])
def get_key(username, status_filter):
keydetails = iam_client.list_access_keys(UserName=username)
key_details = {}
user_iam_details = []
for key in keydetails["AccessKeyMetadata"]:
if key["Status"] == status_filter:
key_details["AccessKeyId"] = key["AccessKeyId"]
user_iam_details.append(key_details)
return user_iam_details
def lambda_handler(event, context):
global mailMsg
global sendMail
try:
parameter = ssm.get_parameter(Name="DaysForAccessKeyRotation", WithDecryption=False)
maxageforaccesskeys = float(str(parameter["Parameter"]["Value"]))
except ClientError as e:
maxageforaccesskeys = 90
if loggingEnabled == "True": print("LOG: The parameter 'DaysForAccessKeyRotation' is not present. Using the default value (90 days) for the access key analisys.")
if loggingEnabled == "True": print("LOG: Checking the users in chunks/pages of " + str(maxItemsPerPage) + " users per chunk/page")
# Listing all the users, using pagination just in case there are a lot of users.
# If there are more than 50 users, normally the AWS api will paginate the call.
listusers = True
marker = None
while listusers:
if marker:
response_iterator = iam_client.list_users(MaxItems=maxItemsPerPage, Marker=marker)
else:
response_iterator = iam_client.list_users(MaxItems=maxItemsPerPage)
for user in response_iterator['Users']:
if loggingEnabled == "True": print("LOG: Analizing the user " + user["UserName"])
mailMsg = ""
sendMail = False
if get_user_keyrotation(username=user["UserName"]):
if loggingEnabled == "True": print("LOG: Automatic access key rotation enabled for this user")
res = iam_client.list_access_keys(UserName=user["UserName"])
# Checking the user's access key age, if it has any
inactivecounter = 0
disabledkeyscounter = 0
totalnumberofkeys = len(res["AccessKeyMetadata"])
if totalnumberofkeys != 0:
for accessKeyMetadata in res["AccessKeyMetadata"]:
if accessKeyMetadata["Status"] == "Active":
accesskeydate = accessKeyMetadata["CreateDate"]
accesskeydate = accesskeydate.strftime("%Y-%m-%d %H:%M:%S")
currentdate = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
accesskeyd = time.mktime(datetime.datetime.strptime(accesskeydate, "%Y-%m-%d %H:%M:%S").timetuple())
currentd = time.mktime(datetime.datetime.strptime(currentdate, "%Y-%m-%d %H:%M:%S").timetuple())
# We get the data in seconds, converting it to days
active_days = (currentd - accesskeyd) / 60 / 60 / 24
if loggingEnabled == "True": print("LOG: Checking access key id: " + accessKeyMetadata["AccessKeyId"] + " - Key age:" + str(int(round(active_days))))
if active_days > float(maxageforaccesskeys):
# Prior to disable the key we look if there are another disabled key.
# If so, the disabled key is deleted in order to make "room" for a new one.
# As of May,2021 the max number of keys per user is 2.
user_inactive_keys = get_key(username=user["UserName"], status_filter="Inactive")
if len(user_inactive_keys) != 0:
if loggingEnabled == "True": print("LOG: Deleting key ID: " + str(user_inactive_keys[0]["AccessKeyId"]) + " in order to be able to create a new access key.")
delete_key(access_key=user_inactive_keys[0]["AccessKeyId"], username=user["UserName"])
if loggingEnabled == "True":print("LOG: Disabling key ID: " + accessKeyMetadata["AccessKeyId"])
disable_key(access_key=accessKeyMetadata["AccessKeyId"], username=user["UserName"])
disabledkeyscounter += 1
elif accessKeyMetadata["Status"] == "Inactive":
inactivecounter += 1
if disabledkeyscounter == maxNumberOfKeysxUser:
if loggingEnabled == "True": print("LOG: The user had the max number of keys active.\nAll the keys have been disabled.\nNo new keys were created.\nPlease check why the user had 2 active keys and if they need to be rotated.")
else:
if loggingEnabled == "True": print("LOG: Creating new key for user: " + user["UserName"])
create_key(username=user["UserName"])
else:
if loggingEnabled == "True": print("LOG: The user " + user["UserName"] + " has the tag for automatic access key rotation but has no access key assigned yet.")
if sendMail:
send_mail(username=user["UserName"], msg=mailMsg)
try:
marker = response_iterator['Marker']
if loggingEnabled == "True": print("LOG: There are more chunks/pages of users to check.")
except KeyError:
if loggingEnabled == "True": print("LOG: There are no more chunks/pages of users to check.")
listusers = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment