Skip to content

Instantly share code, notes, and snippets.

@mahi424
Created May 30, 2024 05:49
Show Gist options
  • Save mahi424/f2181b630cf99e34767d5e90b6f9e801 to your computer and use it in GitHub Desktop.
Save mahi424/f2181b630cf99e34767d5e90b6f9e801 to your computer and use it in GitHub Desktop.
Rotate Password of RDS by lambda on scheduled basis
import boto3
import mysql.connector
from sendgrid.helpers.mail import Mail, Email, To, Content
from sendgrid import SendGridAPIClient
import certifi
import string
import random
import os
from urllib3 import disable_warnings, exceptions
# Initialize boto3 clients
ssm_client = boto3.client('ssm', region_name='ap-south-1')
sendgrid_api_key = os.environ['SENDGRID_API_KEY']
email_from = os.environ['EMAIL_FROM']
environment = os.environ['ENV']
# Disable SSL warnings
disable_warnings(exceptions.InsecureRequestWarning)
os.environ['PYTHONHTTPSVERIFY']='0'
os.environ['REQUESTS_CA_BUNDLE']=certifi.where()
os.environ['SSL_CERT_FILE']=certifi.where()
# Function to get RDS credentials from Parameter Store
def get_rds_credentials():
try:
parameter = ssm_client.get_parameter(
Name=f'/{environment}/redis-rds',
WithDecryption=True
)
#print(f"Retrieved parameter: {parameter}") # Debug statement
# Parse the parameter value from key-value pairs to a dictionary
parameter_value = parameter['Parameter']['Value']
creds = {}
for line in parameter_value.split('\n'):
if line.strip(): # Ignore empty lines
key, value = line.split('=', 1)
creds[key.strip()] = value.strip()
return creds
except ssm_client.exceptions.ParameterNotFound as e:
print(f"Parameter not found: {e}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Function to generate a random password
def generate_password(length=12):
characters = string.ascii_letters + string.digits
return ''.join(random.choice(characters) for i in range(length))
# Function to update the user's password in MySQL
def update_mysql_password(db_credentials, user, new_password):
try:
conn = mysql.connector.connect(
host=db_credentials['DB_HOST'],
user=db_credentials['DB_USERNAME'],
password=db_credentials['DB_PASSWORD'],
database='mysql'
)
cursor = conn.cursor()
#print("Connected to DB")
query = f"ALTER USER '{user}'@'%' IDENTIFIED BY '{new_password}';"
cursor.execute(query)
conn.commit()
cursor.close()
conn.close()
except mysql.connector.Error as err:
print(f"Error: {err}")
raise
# Function to send email with the new password using SendGrid
def send_email_via_sendgrid(email_to, user, new_password):
subject = f"Your New Database Password for {environment} Environment"
body = f"Hello {user},\n\nYour new database password for {environment} environment is: {new_password}\n\nBest regards,\nTeam Devops"
if not sendgrid_api_key:
raise ValueError("SendGrid API key not found. Ensure you have set it in your environment variables.")
message = Mail(
from_email='devops@domain.in',
to_emails=email_to,
subject=subject,
plain_text_content=body
)
try:
sg = SendGridAPIClient(sendgrid_api_key)
response = sg.send(message)
return {
'statusCode': response.status_code,
'body': "Email sent successfully"
}
except Exception as e:
print(f"An error occurred: {e}")
return {
'statusCode': 500,
'body': str(e)
}
# Function to read users and their emails from the file based on the environment
def get_users_from_file(file_path, environment):
users = []
try:
with open(file_path, 'r') as file:
for line in file:
# csv file like prod,user,user@domain.in
env, username, email = line.strip().split(',')
if env == environment:
users.append((username, email))
print(f"Username: {username}")
except FileNotFoundError as e:
print(f"Error: {e}")
raise
except Exception as e:
print(f"Error: {e}")
raise
return users
def lambda_handler(event, context):
try:
db_credentials = get_rds_credentials()
users = get_users_from_file('/var/task/users.csv', environment) # Lambda's working directory
for user, email in users:
#print(f"Updating passwords for user: {users}")
new_password = generate_password()
#print(f"new password for {user} is {new_password}")
update_mysql_password(db_credentials, user, new_password)
#print("Updated password")
send_email_via_sendgrid(email, user, new_password)
print(f"Password for user {user} rotated and email sent to {email}.")
return {
'statusCode': 200,
'body': 'Passwords rotated and emails sent successfully.'
}
except Exception as e:
print(f"Error: {e}")
return {
'statusCode': 500,
'body': f"Error rotating passwords: {e}"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment