Created
May 30, 2024 05:49
-
-
Save mahi424/f2181b630cf99e34767d5e90b6f9e801 to your computer and use it in GitHub Desktop.
Rotate Password of RDS by lambda on scheduled basis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import mysql.connector | |
from sendgrid.helpers.mail import Mail, Email, To, Content | |
from sendgrid import SendGridAPIClient | |
import certifi | |
import string | |
import random | |
import os | |
from urllib3 import disable_warnings, exceptions | |
# Initialize boto3 clients | |
ssm_client = boto3.client('ssm', region_name='ap-south-1') | |
sendgrid_api_key = os.environ['SENDGRID_API_KEY'] | |
email_from = os.environ['EMAIL_FROM'] | |
environment = os.environ['ENV'] | |
# Disable SSL warnings | |
disable_warnings(exceptions.InsecureRequestWarning) | |
os.environ['PYTHONHTTPSVERIFY']='0' | |
os.environ['REQUESTS_CA_BUNDLE']=certifi.where() | |
os.environ['SSL_CERT_FILE']=certifi.where() | |
# Function to get RDS credentials from Parameter Store | |
def get_rds_credentials(): | |
try: | |
parameter = ssm_client.get_parameter( | |
Name=f'/{environment}/redis-rds', | |
WithDecryption=True | |
) | |
#print(f"Retrieved parameter: {parameter}") # Debug statement | |
# Parse the parameter value from key-value pairs to a dictionary | |
parameter_value = parameter['Parameter']['Value'] | |
creds = {} | |
for line in parameter_value.split('\n'): | |
if line.strip(): # Ignore empty lines | |
key, value = line.split('=', 1) | |
creds[key.strip()] = value.strip() | |
return creds | |
except ssm_client.exceptions.ParameterNotFound as e: | |
print(f"Parameter not found: {e}") | |
raise | |
except Exception as e: | |
print(f"Unexpected error: {e}") | |
raise | |
# Function to generate a random password | |
def generate_password(length=12): | |
characters = string.ascii_letters + string.digits | |
return ''.join(random.choice(characters) for i in range(length)) | |
# Function to update the user's password in MySQL | |
def update_mysql_password(db_credentials, user, new_password): | |
try: | |
conn = mysql.connector.connect( | |
host=db_credentials['DB_HOST'], | |
user=db_credentials['DB_USERNAME'], | |
password=db_credentials['DB_PASSWORD'], | |
database='mysql' | |
) | |
cursor = conn.cursor() | |
#print("Connected to DB") | |
query = f"ALTER USER '{user}'@'%' IDENTIFIED BY '{new_password}';" | |
cursor.execute(query) | |
conn.commit() | |
cursor.close() | |
conn.close() | |
except mysql.connector.Error as err: | |
print(f"Error: {err}") | |
raise | |
# Function to send email with the new password using SendGrid | |
def send_email_via_sendgrid(email_to, user, new_password): | |
subject = f"Your New Database Password for {environment} Environment" | |
body = f"Hello {user},\n\nYour new database password for {environment} environment is: {new_password}\n\nBest regards,\nTeam Devops" | |
if not sendgrid_api_key: | |
raise ValueError("SendGrid API key not found. Ensure you have set it in your environment variables.") | |
message = Mail( | |
from_email='devops@domain.in', | |
to_emails=email_to, | |
subject=subject, | |
plain_text_content=body | |
) | |
try: | |
sg = SendGridAPIClient(sendgrid_api_key) | |
response = sg.send(message) | |
return { | |
'statusCode': response.status_code, | |
'body': "Email sent successfully" | |
} | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return { | |
'statusCode': 500, | |
'body': str(e) | |
} | |
# Function to read users and their emails from the file based on the environment | |
def get_users_from_file(file_path, environment): | |
users = [] | |
try: | |
with open(file_path, 'r') as file: | |
for line in file: | |
# csv file like prod,user,user@domain.in | |
env, username, email = line.strip().split(',') | |
if env == environment: | |
users.append((username, email)) | |
print(f"Username: {username}") | |
except FileNotFoundError as e: | |
print(f"Error: {e}") | |
raise | |
except Exception as e: | |
print(f"Error: {e}") | |
raise | |
return users | |
def lambda_handler(event, context): | |
try: | |
db_credentials = get_rds_credentials() | |
users = get_users_from_file('/var/task/users.csv', environment) # Lambda's working directory | |
for user, email in users: | |
#print(f"Updating passwords for user: {users}") | |
new_password = generate_password() | |
#print(f"new password for {user} is {new_password}") | |
update_mysql_password(db_credentials, user, new_password) | |
#print("Updated password") | |
send_email_via_sendgrid(email, user, new_password) | |
print(f"Password for user {user} rotated and email sent to {email}.") | |
return { | |
'statusCode': 200, | |
'body': 'Passwords rotated and emails sent successfully.' | |
} | |
except Exception as e: | |
print(f"Error: {e}") | |
return { | |
'statusCode': 500, | |
'body': f"Error rotating passwords: {e}" | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment