Skip to content

Instantly share code, notes, and snippets.

Last active November 24, 2021 10:04
Show Gist options
  • Save andymotta/cb64ebd71c4703726501fe9a3776ce3d to your computer and use it in GitHub Desktop.
Save andymotta/cb64ebd71c4703726501fe9a3776ce3d to your computer and use it in GitHub Desktop.
Rotate AWS IAM access keys for every Boto profile on host (Compliance)
## Meant to be scheudled on a cron/timer of 90 days (CIS Benchmark)
## The target keys need permissions to rotate themselves
import boto3
from botocore.exceptions import ClientError
import os
from datetime import datetime
import shutil
from ConfigParser import SafeConfigParser
key_file = os.path.join(os.environ['HOME'], '.aws', 'credentials')
parser = SafeConfigParser()
timeStamp = datetime.fromtimestamp(os.path.getmtime(key_file)).strftime("%b-%d-%y-%H:%M:%S")
key_bak = "%s_%s.bak" % (key_file, timeStamp)
def generate_list_from_parser(parser):
lst = []
for profile in parser.sections():
return lst
def get_aws_access_key_id(profile):
return parser.get(profile, 'aws_access_key_id')
def find_user(key):
key_info = iam.get_access_key_last_used(AccessKeyId=key)
return key_info['UserName']
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
print "%s does not exist in target account" % key
return False
def num_keys():
# See if IAM user already has more than one key
paginator = iam.get_paginator('list_access_keys')
for response in paginator.paginate(UserName=user):
return len(response['AccessKeyMetadata'])
except ClientError as e:
if e.response['Error']['Code'] == 'ParamValidationError':
def delete_inactive_access_key(user):
for access_key in iam.list_access_keys(UserName = user)['AccessKeyMetadata']:
if access_key['Status'] == 'Inactive':
# Delete the access key.
print('Deleting access key {0}.'.format(access_key['AccessKeyId']))
response = iam.delete_access_key(
UserName = user,
AccessKeyId = access_key['AccessKeyId']
except ClientError as e:
if e.response['Error']['Code'] == 'InvalidClientTokenId':
print "Not authorized to perform iam maintainence"
# Create an access key
def create_access_key(user):
response = iam.create_access_key(
AccessKey = response['AccessKey']['AccessKeyId']
SecretAccessKey = response['AccessKey']['SecretAccessKey']
return AccessKey, SecretAccessKey
except ClientError as e:
if e.response['Error']['Code'] == 'LimitExceededException':
print "User already has two keys, cannot add more"
# Change state of first access key to inactive before deleting
# this should only tru to update users with one key
def update_access_key(key, user):
def write_creds(profile, keyid, secret, keyfile):
parser.set(profile, 'aws_access_key_id', keyid)
parser.set(profile, 'aws_secret_access_key', secret)
# Writing our configuration file to 'example.ini'
with open(keyfile, 'wb') as configfile:
#### This should be the start of the main function ####
# first create backup
shutil.copy(key_file, key_bak)
if os.getenv("TARGET_PROFILE"):
# targeting only one account
profiles = os.environ["TARGET_PROFILE"]
profiles = [profiles]
#targeting all accounts
profiles = generate_list_from_parser(parser)
keys = []
for p in profiles:
# you'll need to decide which profile should match the default profile, if applicable
# if p == 'default':
# #skip the default profile because on build servers it matches nonprod
# continue
key = get_aws_access_key_id(p)
if key in keys: # Don't do this twice if default is the same key as some other profile
print "Will not rotate %s, list duplicate." % p
# create a custom session to target account based on profile
os.environ["AWS_PROFILE"] = p
session = boto3.session.Session()
iam = session.client('iam')
# Get the user from the key on the host
if find_user(key):
user = find_user(key)
print "Not rotating %s. Moving on..." % key
# 2. Can we add a key for this user? If not, delete the inactive one
if num_keys() == 2:
print "User " + user +" in "+ p +" account " + "has this many keys:", num_keys() # num_keys debugging
print "Cannot delete inactive access key for " + user
# 3. Add a secondary key for the users we can add keys to
creds = create_access_key(user)
print "Created: " + creds[0] + " in: " + p
# 5. deactivate the original keys for each user
update_access_key(key, user)
print "Successfully deactivated " + key + " in " + p
# 6. rotate user and secret of each profile
print "Writing creds to " + key_file + "..."
write_creds(p, creds[0], creds[1], key_file)
# if p == 'your_default':
# # default is always matched with your_default on build servers
# print "Matching default profile with your_default..."
# write_creds('default', creds[0], creds[1], key_file)
Copy link

In def create_access_key I receive ClientError Code "LimitExceeded" rather than "LimitExceededException".
Any reason for the difference?

Copy link

aladme commented Feb 18, 2020

Hi andy, I'm having errors on your script and i don't know how to solve them .... I've tried it but i cannot.

it's into the "write_creds" function

can you help me please?

Created: AKIAUTAXJLLBU35B6FFO in: default
Successfully deactivated AKIAUTAXJLLBTVNVLA7A in default
Writing creds to /home/alejandro/.aws/credentials...
Traceback (most recent call last):
File "", line 140, in
write_creds(p, creds[0], creds[1], key_file)
File "", line 89, in write_creds
File "/usr/lib/python3.6/", line 919, in write
self._sections[section].items(), d)
File "/usr/lib/python3.6/", line 923, in _write_section
TypeError: a bytes-like object is required, not 'str'

Copy link

@aladme, you should be using

with open(keyfile, 'w') as configfile:

instead of

with open(keyfile, 'wb') as configfile:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment