Skip to content

Instantly share code, notes, and snippets.

@danielpsf
Last active May 8, 2023 16:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielpsf/66150596565c574d912b5f736022a432 to your computer and use it in GitHub Desktop.
Save danielpsf/66150596565c574d912b5f736022a432 to your computer and use it in GitHub Desktop.
Script to terminate RDS instances without connection
import datetime
import boto3
import botocore
import argparse
from distutils.util import strtobool
from datetime import datetime, timedelta
def define_parameters():
parser = argparse.ArgumentParser(
usage="python terminate_rds_without_connection.py --profile YOUR_PROFILE --region YOUR_REGION --dry-run True|False --shutdown True|False",
description="Terminate or shurdown all RDS instances with 0 connections"
)
parser.add_argument('--profile', help="Specify a given AWS SDK profile to be used. It can be 'default' if you didn't configure anything else", required=True)
parser.add_argument('--region', help="Specify a given AWS Region to be used.", required=True)
parser.add_argument('--shutdown', help="It specifies if you want to shutdown the instances. If this is set to False, it will terminate the instances instead of shut them down. When shutting down an instance a DB Snapshot is generated", default=True, type=lambda x: bool(strtobool(x)))
parser.add_argument('--dry-run', help="It runs in dry-run mode, which means it won't perform the destructive action, but will just report on what it can happen if you run without the flag.", default=True, type=lambda x: bool(strtobool(x)))
return parser
class RDSTermination:
#Strandard constructor for RDSTermination class
def __init__(self, cloudwatch_object, rds_object):
self.cloudwatch_object = cloudwatch_object
self.rds_object = rds_object
#Getter and setters for variables.
@property
def cloudwatch_object(self):
return self._cloudwatch_object
@cloudwatch_object.setter
def cloudwatch_object(self, cloudwatch_object):
self._cloudwatch_object = cloudwatch_object
@property
def rds_object(self):
return self._rds_object
@rds_object.setter
def rds_object(self, rds_object):
self._rds_object = rds_object
# Fetch connections details for all the RDS instances.Filter the list and return
# only those instances which are having 0 connections at the time of this script run
def _get_instance_connection_info(self):
rds_instances_connection_details = {}
response = self.cloudwatch_object.get_metric_data(
# This is how you would capture RDS's other important metrics to build a table
# {
# 'Id': 'fetching_cpu_utilization',
# 'Expression': "SEARCH('{AWS/RDS,DBInstanceIdentifier} MetricName=\"CPUUtilization\"', 'Average', 1209600)",
# 'ReturnData': True
# },
# {
# 'Id': 'fetching_freeable_memory',
# 'Expression': "SEARCH('{AWS/RDS,DBInstanceIdentifier} MetricName=\"FreeableMemory\"', 'Average', 1209600)",
# 'ReturnData': True
# },
MetricDataQueries=[
{
'Id': 'fetching_data_for_something',
'Expression': "SEARCH('{AWS/RDS,DBInstanceIdentifier} MetricName=\"DatabaseConnections\"', 'Average', 1209600)",
'ReturnData': True
},
],
EndTime=datetime.utcnow(),
StartTime=datetime.utcnow() - timedelta(days=14),
ScanBy='TimestampDescending',
MaxDatapoints=123
)
# response is of type dictionary with MetricDataResults as key
for instance_info in response['MetricDataResults']:
if len(instance_info['Timestamps']) > 0:
rds_instances_connection_details[instance_info['Label']] = instance_info['Values'][-1]
return rds_instances_connection_details
# If the DB Instance is marked with a tag `keep-alive` set to true
def _getKeepAliveValue(self, instance):
# return bool(strtobool(next((tag['Value'] for tag in instance["TagList"] if str(tag['Key']).lower() == 'keep-alive'), "False")))
return False
# Fetches list of all instances and there status.
def _fetch_all_rds_instance_state(self):
all_rds_instance_state = {}
response = self.rds_object.describe_db_instances()
instance_details = response['DBInstances']
for instance in instance_details:
all_rds_instance_state[instance['DBInstanceIdentifier']] = {'status': instance['DBInstanceStatus'], 'keep_alive': self._getKeepAliveValue(instance)}
return all_rds_instance_state
# We further refine the list and remove instances which are stopped. We will work on
# Instances with Available state only
def _get_unused_instances(self):
instances = self._get_instance_connection_info()
all_instance_state = self._fetch_all_rds_instance_state()
instances_to_delete = []
try:
for instance_name in instances.keys():
if instances[instance_name] == 0.0 and all_instance_state[instance_name]['status'] == 'available' and all_instance_state[instance_name]['keep_alive'] is not True:
instances_to_delete.append(instance_name)
except BaseException:
print("Check if instance connection_info is empty")
return instances_to_delete
# Function to delete the instances reported in final list.It deletes instances with 0 connection
# and status as available
def terminate_rds_instances(self, dry_run=True):
if dry_run:
message = 'DRY-RUN'
else:
message = 'DELETE'
rdsnames = self._get_unused_instances()
if len(rdsnames) > 0:
for rdsname in rdsnames:
try:
response = self.rds_object.describe_db_instances(
DBInstanceIdentifier=rdsname
)
termination_protection = response['DBInstances'][0]['DeletionProtection']
except botocore.exceptions.ClientError as error:
print("[ERROR]: {}. Failed to be terminated due to '{}'".format(rdsname, error.response['Error']['Message']))
if termination_protection is True:
try:
print("Removing delete termination for {}".format(rdsname))
if not dry_run:
response = self.rds_object.modify_db_instance(
DBInstanceIdentifier=rdsname,
DeletionProtection=False
)
except BaseException as e:
print(
"[ERROR]: Could not modify db termination protection "
"due to following error:\n " + str(
e))
exit(1)
try:
if not dry_run:
print("[WARNING]: {} deletion request in progress!".format(rdsname))
response = self.rds_object.delete_db_instance(
DBInstanceIdentifier=rdsname,
SkipFinalSnapshot=False,
FinalDBSnapshotIdentifier="{}-{}".format(rdsname, datetime.now().strftime("%m-%d-%YT%H-%M-%S"))
)
print('[{}]: RDS instance {} deleted'.format(message, rdsname))
except botocore.exceptions.ClientError as error:
print("[ERROR]: {} failed to be stopped due to {}".format(rdsname, error.response['Error']['Message']))
else:
print("No RDS instance marked for deletion")
# Function to shutdown the instances reported in final list.It shutdown instances with 0 connection
# and status as available
def shutdown_rds_instance(self, dry_run=True):
if dry_run:
message = 'DRY-RUN'
else:
message = 'SHUTDOWN'
rdsnames = self._get_unused_instances()
if len(rdsnames) > 0:
for rdsname in rdsnames:
try:
self.rds_object.describe_db_instances(
DBInstanceIdentifier=rdsname
)
except BaseException as e:
print('[ERROR]: reading details' + str(e))
exit(1)
try:
if not dry_run:
print("[WARNING]: {} stop request in progress!".format(rdsname))
self.rds_object.stop_db_instance(
DBInstanceIdentifier=rdsname,
DBSnapshotIdentifier="{}-{}".format(rdsname, datetime.now().strftime("%m-%d-%YT%H-%M-%S"))
)
print('[{}]: RDS instance {} stopped'.format(message, rdsname))
except botocore.exceptions.ClientError as error:
print("[ERROR]: {} failed to be stopped due to {}".format(rdsname, error.response['Error']['Message']))
else:
print("No RDS instance marked for shutdown")
if __name__ == "__main__":
args = define_parameters().parse_args()
session = boto3.Session(profile_name=args.profile)
cloud_watch_object = session.client('cloudwatch', region_name=args.region)
rds_object = session.client('rds', region_name=args.region)
rds_termination_object = RDSTermination(cloud_watch_object, rds_object)
if args.shutdown is True:
rds_termination_object.shutdown_rds_instance(dry_run=args.dry_run)
else:
rds_termination_object.terminate_rds_instances(dry_run=args.dry_run)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment