Skip to content

Instantly share code, notes, and snippets.

@polius
Last active June 11, 2024 16:11
Show Gist options
  • Save polius/5760117924b55d73f11434143596fd10 to your computer and use it in GitHub Desktop.
Save polius/5760117924b55d73f11434143596fd10 to your computer and use it in GitHub Desktop.
Retrieve the AWS RDS pricing for an Aurora cluster comparing Aurora Standard and Aurora I/O-Optimized modes.
# Install dependencies: python3 -m pip install boto3 rich
import json
import boto3
import argparse
from decimal import Decimal
from datetime import datetime, timedelta, timezone
from rich.console import Console
from rich.table import Table
from rich.tree import Tree
class main:
def __init__(self):
# Grab parameters
parser = argparse.ArgumentParser()
parser.add_argument('--region', required=True, help='AWS region name (eu-west-1)')
parser.add_argument('--cluster', required=True, help='Database cluster name')
parser.add_argument('--period', required=False, default=30, type=int, choices=range(1, 366), help='Period (default: 30 days, maximum: 365 days)')
parser.add_argument('--skip-today', required=False, action='store_true', help='Skip the current day')
parser.add_argument('--profile', required=False, help='AWS profile name ($ aws configure --profile customer)')
self._args = parser.parse_args()
# Create a session with the specified profile
session = boto3.Session(profile_name=self._args.profile)
# Init console
self._console = Console()
# Create Boto3 clients
self._cloudwatch = session.client('cloudwatch', region_name=self._args.region)
self._rds = session.client('rds', region_name=self._args.region)
self._pricing = boto3.client('pricing', region_name='us-east-1')
# Calculate start and end time
if self._args.period == 1 and self._args.skip_today:
self._console.error("ERROR: The 'period' parameter must be set to a value greater than 1 when the 'skip-today' parameter is enabled.", style="red")
return
if self._args.skip_today:
self._end_time = (datetime.now(timezone.utc) - timedelta(hours=24)).replace(hour=23, minute=59, second=59, microsecond=999999)
else:
self._end_time = (datetime.now(timezone.utc) - timedelta(hours=1)).replace(minute=59, second=59, microsecond=999999)
self._start_time = (datetime.now(timezone.utc) - timedelta(days=self._args.period-1)).replace(hour=0, minute=0, second=0, microsecond=0)
# Start computing
self.compute()
def compute(self):
# Get Data from RDS Cluster
try:
request = self.__rds_request()
except WarningException as e:
self._console.print(e, style="#FF6961")
request = e.data
except Exception as e:
self._console.print(e, style="#FF6961")
return
# Get resources
cluster = request['cluster']
instances = request['instances']
# Get Pricing
region_code = self.__get_region_code(self._args.region)
region_code = f"{region_code}-" if len(region_code) > 0 else ''
pricing = {
"standard": {
"compute": {
k: self.compute_instance_pricing(k, v, False)
for k,v in {i['DBInstanceClass']: i['Engine'] for i in instances}.items()
},
"storage": self.__pricing_request([('usageType', f"{region_code}Aurora:StorageUsage"),('operation','CreateDBInstance')]),
"iops": self.__pricing_request([('usageType', f"{region_code}Aurora:StorageIOUsage"),('operation','CreateDBInstance')]),
},
"optimized": {
"compute": {
k: self.compute_instance_pricing(k, v, True)
for k,v in {i['DBInstanceClass']: i['Engine'] for i in instances}.items()
},
"storage": self.__pricing_request([('usageType', f"{region_code}Aurora:IO-OptimizedStorageUsage"),('operation','CreateDBInstance')]),
"iops": 0,
}
}
# Compute Metrics
costs = {
'standard': {
'compute': 0,
'storage': 0,
'iops': 0,
},
'optimized': {
'compute': 0,
'storage': 0,
'iops': 0,
}
}
# Get CloudWatch Metrics
volume_bytes_used = self.__cloudwatch_request('VolumeBytesUsed', [{'Name': 'DBClusterIdentifier', 'Value': self._args.cluster}], 86400, 'Maximum')['MetricDataResults'][0]['Values']
volume_read_iops = self.__cloudwatch_request('VolumeReadIOPs', [{'Name': 'DBClusterIdentifier', 'Value': self._args.cluster}], 86400, 'Sum')['MetricDataResults'][0]['Values']
volume_write_iops = self.__cloudwatch_request('VolumeWriteIOPs', [{'Name': 'DBClusterIdentifier', 'Value': self._args.cluster}], 86400, 'Sum')['MetricDataResults'][0]['Values']
# Compute costs
for i in volume_bytes_used:
costs['standard']['storage'] += pricing['standard']['storage'] * Decimal(str(i/1024/1024/1024)) * Decimal(str(1/30))
costs['optimized']['storage'] += pricing['optimized']['storage'] * Decimal(str(i/1024/1024/1024)) * Decimal(str(1/30))
for i in volume_read_iops:
costs['standard']['iops'] += pricing['standard']['iops'] * Decimal(str(i))
for i in volume_write_iops:
costs['standard']['iops'] += pricing['standard']['iops'] * Decimal(str(i))
for i in instances:
if i['DBInstanceClass'] == 'db.serverless':
serverless_database_capacity = self.__cloudwatch_request('ServerlessDatabaseCapacity', [{'Name': 'DBInstanceIdentifier', 'Value': i['DBInstanceIdentifier']}], 3600, 'Average')['MetricDataResults'][0]['Values']
for acu in [Decimal(str(round(i,1))) for i in serverless_database_capacity]:
costs['standard']['compute'] += pricing['standard']['compute'][i['DBInstanceClass']] * acu
costs['optimized']['compute'] += pricing['optimized']['compute'][i['DBInstanceClass']] * acu
else:
costs['standard']['compute'] += pricing['standard']['compute'][i['DBInstanceClass']] * sum(1 for x in instances if x['DBInstanceClass'] == i['DBInstanceClass']) * 24 * self._args.period
costs['optimized']['compute'] += pricing['optimized']['compute'][i['DBInstanceClass']] * sum(1 for x in instances if x['DBInstanceClass'] == i['DBInstanceClass']) * 24 * self._args.period
costs['standard']['total'] = costs['standard']['compute'] + costs['standard']['storage'] + costs['standard']['iops']
costs['optimized']['total'] = costs['optimized']['compute'] + costs['optimized']['storage'] + costs['optimized']['iops']
# Print Summary
self._console.print("+---------+", style="#FFA500")
self._console.print("| SUMMARY |", style="#FFA500")
self._console.print("+---------+", style="#FFA500")
self._console.print("Scan period:", style="bold")
self._console.print(f"- Start: {self._start_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
self._console.print(f"- End: {self._end_time.strftime('%Y-%m-%d %H:%M:%S')} UTC")
tree = Tree(f"[bold]Cluster: {self._args.cluster} ({instances[0]['Engine']}) | ({cluster['DbClusterResourceId']})[/bold]")
for i in instances:
tree.add(f"{i['DBInstanceIdentifier']} ({i['DBInstanceClass']})")
self._console.print(tree)
# Print Pricing
self._console.print("+---------+", style="#FFA500")
self._console.print("| PRICING |", style="#FFA500")
self._console.print("+---------+", style="#FFA500")
table = Table(title="", title_style='#FFA500', title_justify="left", show_lines=True)
columns = ['Pricing Component','Aurora Standard','Aurora I/O-Optimized']
rows = [
[
"Compute",
"\n".join([f"{instance}: ${price:.3f} per hour" for instance, price in pricing['standard']['compute'].items()]),
"\n".join([f"{instance}: ${price:.3f} per hour" for instance, price in pricing['optimized']['compute'].items()]),
],
[
"Storage",
f"${pricing['standard']['storage']:.3f} per GB-month",
f"${pricing['optimized']['storage']:.3f} per GB-month",
],
[
"IOPS",
f"${(pricing['standard']['iops']*1000000):.3f} per 1 million requests",
"Included",
],
]
for i, column in enumerate(columns):
table.add_column(column)
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Print Costs
self._console.print("+-------+", style="#FFA500")
self._console.print("| COSTS |", style="#FFA500")
self._console.print("+-------+", style="#FFA500")
table = Table(title="", show_lines=True)
columns = ['Cost Component','Aurora Standard','Aurora I/O-Optimized']
rows = [
[
"Compute",
f"${costs['standard']['compute']:.3f}",
f"${costs['optimized']['compute']:.3f}",
],
[
"Storage",
f"${costs['standard']['storage']:.3f}",
f"${costs['optimized']['storage']:.3f}",
],
[
"IOPS",
f"${costs['standard']['iops']:.3f}",
"$0.00",
],
[
"[bold]Total[/bold]",
f"{'[#32CD32][bold]' if costs['standard']['total'] <= costs['optimized']['total'] else '[#FF0000]'}${(costs['standard']['total']):.3f}",
f"{'[#32CD32][bold]' if costs['optimized']['total'] < costs['standard']['total'] else '[#FF0000]'}${(costs['optimized']['total']):.3f}",
],
]
for i, column in enumerate(columns):
table.add_column(column)
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Print Overall
if costs['standard']['total'] <= costs['optimized']['total']:
self._console.print(f"--> Aurora Standard is [white bold]{((costs['optimized']['total'] - costs['standard']['total'])/costs['optimized']['total'] * 100):.2f}%[/white bold] cheaper than Aurora I/O-Optimized.")
else:
self._console.print(f"--> Aurora I/O-Optimized is [white bold]{((costs['standard']['total'] - costs['optimized']['total'])/costs['standard']['total'] * 100):.2f}%[/white bold] cheaper than Aurora Standard.")
def compute_instance_pricing(self, instance, engine, io_optimized):
region_code = self.__get_region_code(self._args.region)
region_code = f"{region_code}-" if len(region_code) > 0 else ''
if instance == 'db.serverless':
usageType = f"{region_code}Aurora:ServerlessV2IOOptimizedUsage" if io_optimized else f"{region_code}Aurora:ServerlessV2Usage"
else:
usageType = f"{region_code}InstanceUsageIOOptimized:{instance}" if io_optimized else f"{region_code}InstanceUsage:{instance}"
return self.__pricing_request([('usageType', usageType.replace('xlarge','xl')),('databaseEngine', engine)])
####################
# Internal Methods #
####################
def __rds_request(self):
data = {"cluster": None, "instances": []}
# Get cluster information
try:
response = self._rds.describe_db_clusters(DBClusterIdentifier=self._args.cluster)['DBClusters']
except Exception as e:
raise Exception(str(e))
if len(response) == 0:
raise Exception(f"The '{self._args.cluster}' cluster does not exist. Skipping...")
# Check the cluster is Aurora
if response[0]['Engine'] not in ('aurora-mysql','aurora-postgresql'):
raise Exception(f"The '{self._args.cluster}' cluster is not an Aurora. Skipping...")
# Store cluster identifier
data['cluster'] = {"DbClusterResourceId": response[0]['DbClusterResourceId'], "Engine": response[0]['Engine'], "EngineVersion": response[0]['EngineVersion']}
# Get instances information
response = self._rds.describe_db_instances(
Filters=[
{
'Name': 'db-cluster-id',
'Values': [self._args.cluster]
},
],
)
data['instances'] = [
{
'DBInstanceIdentifier': i['DBInstanceIdentifier'],
'DBInstanceClass': i['DBInstanceClass'],
'Engine': i['Engine'].replace('aurora-mysql','Aurora MySQL').replace('aurora-postgresql','Aurora PostgreSQL')
}
for i in response['DBInstances']
]
# Check MySQL engine version requirements (3.03.1)
if data['cluster']['Engine'] == 'aurora-mysql' and data['cluster']['EngineVersion'].split('mysql_aurora.')[1] < '3.04.1':
raise WarningException(data, f"Warning: The '{self._args.cluster}' Aurora MySQL cluster is not compatible with Aurora I/O-Optimized mode. The current version is {data['cluster']['EngineVersion'].split('mysql_aurora.')[1]} and the minimum supported is 3.03.1.")
# Check PostgreSQL engine version requirements (13.10 and higher, 14.7 and higher or 15.2 and higher)
if data['cluster']['Engine'] == 'aurora-postgresql':
if data['cluster']['EngineVersion'].startswith('13.') and not self.__check_engine_version(data['cluster']['EngineVersion'], '13.10'):
raise WarningException(data, f"Warning: The '{self._args.cluster}' PostgreSQL MySQL cluster is not compatible with Aurora I/O-Optimized mode. The current version is {data['cluster']['EngineVersion']} and the minimum supported is 13.10 and higher, 14.7 and higher or 15.2 and higher.")
if data['cluster']['EngineVersion'].startswith('14.') and not self.__check_engine_version(data['cluster']['EngineVersion'], '14.7'):
raise WarningException(data, f"Warning: The '{self._args.cluster}' PostgreSQL MySQL cluster is not compatible with Aurora I/O-Optimized mode. The current version is {data['cluster']['EngineVersion']} and the minimum supported is 13.10 and higher, 14.7 and higher or 15.2 and higher.")
if data['cluster']['EngineVersion'].startswith('15.') and not self.__check_engine_version(data['cluster']['EngineVersion'], '15.2'):
raise WarningException(data, f"Warning: The '{self._args.cluster}' PostgreSQL MySQL cluster is not compatible with Aurora I/O-Optimized mode. The current version is {data['cluster']['EngineVersion']} and the minimum supported is 13.10 and higher, 14.7 and higher or 15.2 and higher.")
# Check Instance class requirements
for i in data['instances']:
if not i['DBInstanceClass'].startswith(('db.serverless','db.t3.','db.r5.','db.r6.','db.t4g.','db.r6g.','db.r7g.','db.x2g.')):
raise WarningException(data, f"Warning: The '{self._args.cluster}' cluster is not compatible with Aurora I/O-Optimized mode. Only the following instance classes are supported: db.serverless, t3, r5, r6, t4g, r6g, r7g, x2g.")
# Return data
return data
def __pricing_request(self, filters):
params = {
"ServiceCode": "AmazonRDS",
"Filters": [
{
'Type': 'TERM_MATCH',
'Field': 'regionCode',
'Value': self._args.region,
},
*[
{
"Field": field,
"Type": "TERM_MATCH",
"Value": value,
}
for field, value in filters
]
]
}
response = json.loads(self._pricing.get_products(**params)['PriceList'][0])
return Decimal(str({mode: max([j['pricePerUnit'] for i in value.values() for j in i['priceDimensions'].values()], key=lambda x: x.get('USD', 0))['USD'] for mode, value in response['terms'].items()}['OnDemand']))
def __cloudwatch_request(self, metric_name, dimensions, period, stat):
response = self._cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'm1',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/RDS',
'MetricName': metric_name,
'Dimensions': dimensions
},
'Period': period,
'Stat': stat,
},
'ReturnData': True
},
],
StartTime=self._start_time.isoformat(),
EndTime=self._end_time.isoformat()
)
return response
def __get_region_code(self, region):
# https://docs.aws.amazon.com/AmazonS3/latest/userguide/aws-usage-report-understand.html
region_codes = {
"ap-east-1": "APE1",
"ap-northeast-1": "APN1",
"ap-northeast-2": "APN2",
"ap-northeast-3": "APN3",
"ap-southeast-1": "APS1",
"ap-southeast-2": "APS2",
"ap-south-1": "APS3",
"ap-southeast-3": "APS4",
"ap-south-2": "APS5",
"ap-southeast-4": "APS6",
"ca-central-1": "CAN1",
"cn-north-1": "CNN1",
"cn-northwest-1": "CNW1",
"af-south-1": "AFS1",
"eu-central-2": "EUC2",
"eu-north-1": "EUN1",
"eu-south-2": "EUS2",
"eu-central-1": "EUC1",
"eu-west-1": "EU",
"eu-south-1": "EUS1",
"eu-west-2": "EUW2",
"eu-west-3": "EUW3",
"il-central-1": "ILC1",
"me-central-1": "MEC1",
"me-south-1": "MES1",
"sa-east-1": "SAE1",
"us-gov-west-1": "UGW1",
"us-gov-east-1": "UGE1",
"us-east-1": "",
"us-east-2": "USE2",
"us-west-1": "USW1",
"us-west-2": "USW2",
}
return region_codes[region]
def __check_engine_version(self, current, minimum):
parts_current = [int(part) for part in current.split('.')]
parts_minimum = [int(part) for part in minimum.split('.')]
while len(parts_current) != len(parts_minimum):
if len(parts_current) < len(parts_minimum):
parts_current.append(0)
else:
parts_minimum.append(0)
for part_current, part_minimum in zip(parts_current, parts_minimum):
if part_current < part_minimum:
return False
return True
class WarningException(Exception):
def __init__(self, data, message):
super().__init__(message)
self.data = data
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment