/run_service_key_rotation.py Secret
Created
May 26, 2021 02:18
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import json | |
import pandas as pd | |
from ds_helpers import aws, db | |
def get_accounts(db_conn): | |
df = pd.read_sql(f''' | |
select username, secret_name | |
from aws_management.service_access_keys; | |
''', db_conn) | |
return df['username'].tolist(), df['secret_name'].tolist() | |
def get_current_access_key(client, username): | |
paginator = client.get_paginator('list_access_keys') | |
for response in paginator.paginate(UserName=username): | |
return response.get('AccessKeyMetadata')[0].get('AccessKeyId') | |
def create_new_access_keys(client, username): | |
response = client.create_access_key( | |
UserName=username | |
) | |
key_id = response['AccessKey'].get('AccessKeyId') | |
secret_key = response['AccessKey'].get('SecretAccessKey') | |
access_keys = (key_id, secret_key) | |
return access_keys | |
def update_access_keys_in_secrets_manager(client, secret_name, access_keys): | |
response = client.update_secret(SecretId=secret_name, | |
SecretString=json.dumps({ | |
"access_key": f"{access_keys[0]}", | |
"secret_key": f"{access_keys[1]}", | |
})) | |
def delete_current_access_keys(client, access_key, username): | |
client.delete_access_key( | |
AccessKeyId=access_key, | |
UserName=username | |
) | |
def main(): | |
iam_client = boto3.client('iam') | |
secrets_manager_client = boto3.client("secretsmanager") | |
db_conn = db.connect_to_mysql(aws.get_secrets_manager_secret('aws-key-management-db')) | |
usernames, secrets = get_accounts(db_conn) | |
for username, secret in zip(usernames, secrets): | |
current_access_key = get_current_access_key(iam_client, username) | |
new_access_key_pair = create_new_access_keys(iam_client, username) | |
update_access_keys_in_secrets_manager(secrets_manager_client, secret, new_access_key_pair) | |
delete_current_access_keys(iam_client, current_access_key, username) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment