Skip to content

Instantly share code, notes, and snippets.

@polius
Created February 14, 2024 14:31
Show Gist options
  • Save polius/a427425d393e6029d57993fe0146c73e to your computer and use it in GitHub Desktop.
Save polius/a427425d393e6029d57993fe0146c73e to your computer and use it in GitHub Desktop.
A comprehensive tool for analyzing the workload of a DynamoDB table and extracting valuable insights.
# Install dependencies: python3 -m pip install boto3 numpy rich
import json
import boto3
import argparse
import numpy as np
from decimal import Decimal
from datetime import datetime, timedelta
from rich.console import Console
from rich.table import Table
class main:
def __init__(self):
# Grab parameters
parser = argparse.ArgumentParser()
parser.add_argument('--region', required=True, help='AWS region name (eu-west-1)')
parser.add_argument('--table', required=True, help='Table name')
parser.add_argument('--profile', required=False, help='AWS profile name (~/.aws/config)')
parser.add_argument('--period', required=False, default=7, type=int, choices=range(1, 61), help='Period (default: 7 days, maximum: 60 days)')
parser.add_argument('--skip-today', required=False, action='store_true', help='Skip the current day')
self._args = parser.parse_args()
# Init console
self._console = Console()
# Create a session with the specified profile
session = boto3.Session(profile_name=self._args.profile)
# Create Boto3 clients
self._dynamodb = session.client('dynamodb', region_name=self._args.region)
self._cloudwatch = session.client('cloudwatch', region_name=self._args.region)
self._autoscaling = session.client('application-autoscaling', region_name=self._args.region)
self._pricing = boto3.client('pricing', region_name='us-east-1')
# Calculate start and end time
if self._args.period == 1 and self._args.skip_today:
print("ERROR: The 'period' parameter must be set to a value greater than 1 when the 'skip-today' parameter is enabled.")
return
if self._args.skip_today:
self._end_time = (datetime.utcnow() - timedelta(hours=24)).replace(hour=23, minute=59, second=59, microsecond=999999)
else:
self._end_time = (datetime.utcnow() - timedelta(hours=1)).replace(minute=59, second=59, microsecond=999999)
self._start_time = (datetime.utcnow() - timedelta(days=self._args.period-1)).replace(hour=0, minute=0, second=0, microsecond=0)
self._start_time = self._start_time.isoformat()
self._end_time = self._end_time.isoformat()
# Start computing
self.compute()
def compute(self):
# Summary
summary = self.summary()
# Capacity Units Consumed
self.capacity_units_consumed(summary)
# Capacity Units Summary
cus = self.capacity_units_summary(summary)
# Capacity Units Provisioned
cup = self.capacity_units_provisioned(summary)
# Calculate Throttling
self.throttling(summary, cup)
# Cost comparison
self.cost_comparison(summary, cus, cup)
def summary(self):
# Init data
data = {
'Timestamps': [],
'ConsumedReadCapacityUnits': [],
'ConsumedWriteCapacityUnits': [],
'ProvisionedRCU': 0,
'ProvisionedWRU': 0,
'GSI': {},
'Autoscaling': {},
}
# Get Table provisioned throughput (RCU & WCU)
dynamodb = self._dynamodb.describe_table(TableName=self._args.table)
data['ProvisionedRCU'] = dynamodb['Table']['ProvisionedThroughput']['ReadCapacityUnits']
data['ProvisionedWCU'] = dynamodb['Table']['ProvisionedThroughput']['WriteCapacityUnits']
data['Status'] = dynamodb['Table']['TableStatus']
# Store table size
data['TableSizeBytes'] = dynamodb['Table']['TableSizeBytes']
# Get GSI provisioned throughput (RCU & WCU)
if 'GlobalSecondaryIndexes' in dynamodb['Table']:
for i in dynamodb['Table']['GlobalSecondaryIndexes']:
data['GSI'][i['IndexName']] = {
'KeySchema': i['KeySchema'],
'Projection': i['Projection'],
'ProvisionedRCU': i['ProvisionedThroughput']['ReadCapacityUnits'],
'ProvisionedWCU': i['ProvisionedThroughput']['WriteCapacityUnits'],
'Status': i['IndexStatus'],
}
# Get Table ConsumedReadCapacityUnits
dimensions = [{'Name': 'TableName', 'Value': self._args.table}]
metric_name = 'ConsumedReadCapacityUnits'
period = 86400
stat = 'Sum'
cloudwatch = self.__cloudwatch_request(self._cloudwatch, metric_name, dimensions, period, stat, self._start_time, self._end_time)
data['Timestamps'] = cloudwatch['MetricDataResults'][0]['Timestamps']
data['ConsumedReadCapacityUnits'] = cloudwatch['MetricDataResults'][0]['Values']
# Get Table ConsumedWriteCapacityUnits
metric_name = 'ConsumedWriteCapacityUnits'
cloudwatch = self.__cloudwatch_request(self._cloudwatch, metric_name, dimensions, period, stat, self._start_time, self._end_time)
data['ConsumedWriteCapacityUnits'] = cloudwatch['MetricDataResults'][0]['Values']
# Get table autoscaling policies
response = self._autoscaling.describe_scalable_targets(ServiceNamespace='dynamodb', ResourceIds=[f"table/{self._args.table}"])
for i in response['ScalableTargets']:
data['Autoscaling'][i['ScalableDimension']] = {
"minimum_capacity_units": i['MinCapacity'],
"maximum_capacity_units": i['MaxCapacity'],
}
response = self._autoscaling.describe_scaling_policies(ServiceNamespace='dynamodb', ResourceId=f"table/{self._args.table}")
for i in response['ScalingPolicies']:
data['Autoscaling'][i['ScalableDimension']]['target_utilization'] = i['TargetTrackingScalingPolicyConfiguration']['TargetValue']
# Get GSI ConsumedReadCapacityUnits & ConsumedWriteCapacityUnits
for gsi in data['GSI'].keys():
dimensions = [{'Name': 'TableName', 'Value': self._args.table}, {'Name': 'GlobalSecondaryIndexName', 'Value': gsi}]
metrics = ['ConsumedReadCapacityUnits','ConsumedWriteCapacityUnits']
for metric_name in metrics:
cloudwatch = self.__cloudwatch_request(self._cloudwatch, metric_name, dimensions, period, stat, self._start_time, self._end_time)
data['GSI'][gsi][metric_name] = cloudwatch['MetricDataResults'][0]['Values']
# Get GSI autoscaling policies
for gsi in data['GSI'].keys():
data['GSI'][gsi]['Autoscaling'] = {}
response = self._autoscaling.describe_scalable_targets(ServiceNamespace='dynamodb', ResourceIds=[f"table/{self._args.table}/index/{gsi}"])
for i in response['ScalableTargets']:
data['GSI'][gsi]['Autoscaling'][i['ScalableDimension']] = {
"minimum_capacity_units": i['MinCapacity'],
"maximum_capacity_units": i['MaxCapacity'],
}
response = self._autoscaling.describe_scaling_policies(ServiceNamespace='dynamodb', ResourceId=f"table/{self._args.table}/index/{gsi}")
for i in response['ScalingPolicies']:
data['GSI'][gsi]['Autoscaling'][i['ScalableDimension']]['target_utilization'] = i['TargetTrackingScalingPolicyConfiguration']['TargetValue']
# Print metadata
self._console.print("+---------+", style="#f49ac2")
self._console.print("| SUMMARY |", style="#f49ac2")
self._console.print("+---------+", style="#f49ac2")
table = Table(title="", title_style='#FFA500', title_justify="left")
columns = ['Name','Creation date','Class','Items','Storage','Mode','Provisioned RCU','Provisioned WCU','Deletion protection','GSIs','Status']
rows = [[
f"[Table] {self._args.table}",
f"{dynamodb['Table']['CreationDateTime'].strftime('%Y-%m-%d %H:%M:%S')} UTC",
f"{'STANDARD' if 'TableClassSummary' not in dynamodb['Table'] else dynamodb['Table']['TableClassSummary']['TableClass']}",
str(dynamodb['Table']['ItemCount']),
f"{data['TableSizeBytes']/1024/1024:.2f} MB",
'ON DEMAND' if 'BillingMode' not in dynamodb['Table'] or dynamodb['Table']['BillingModeSummary']['BillingMode'] == 'PAY_PER_REQUEST' else 'PROVISIONED',
f"{data['ProvisionedRCU']} ({data['Autoscaling']['dynamodb:table:ReadCapacityUnits']['minimum_capacity_units']} - {data['Autoscaling']['dynamodb:table:ReadCapacityUnits']['maximum_capacity_units']} | {data['Autoscaling']['dynamodb:table:ReadCapacityUnits']['target_utilization']}%)" if data['Autoscaling'] else str(data['ProvisionedRCU']),
f"{data['ProvisionedWCU']} ({data['Autoscaling']['dynamodb:table:WriteCapacityUnits']['minimum_capacity_units']} - {data['Autoscaling']['dynamodb:table:WriteCapacityUnits']['maximum_capacity_units']} | {data['Autoscaling']['dynamodb:table:WriteCapacityUnits']['target_utilization']}%)" if data['Autoscaling'] else str(data['ProvisionedWCU']),
f"{'[bright_green]' if dynamodb['Table']['DeletionProtectionEnabled'] else '[bright_red]'}{str(dynamodb['Table']['DeletionProtectionEnabled']).upper()}",
str(len(data['GSI'].keys())),
data['Status'],
]]
for i, column in enumerate(columns):
table.add_column(column, style=f"{'bold ' if i == 0 else ''}#a2bffe")
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
if data['GSI']:
table = Table(title="", title_style='#FFA500', title_justify="left")
columns = ['Name','Mode','Provisioned RCU','Provisioned WCU','Projection','Status']
rows = []
for k, v in data['GSI'].items():
row = [
f"[GSI] {k}",
'ON DEMAND' if v['ProvisionedRCU'] == 0 and v['ProvisionedWCU'] == 0 else 'PROVISIONED',
f"{v['ProvisionedRCU']} ({v['Autoscaling']['dynamodb:index:ReadCapacityUnits']['minimum_capacity_units']} - {v['Autoscaling']['dynamodb:index:ReadCapacityUnits']['maximum_capacity_units']} | {v['Autoscaling']['dynamodb:index:ReadCapacityUnits']['target_utilization']}%)" if v['Autoscaling'] else str(v['ProvisionedRCU']),
f"{v['ProvisionedWCU']} ({v['Autoscaling']['dynamodb:index:WriteCapacityUnits']['minimum_capacity_units']} - {v['Autoscaling']['dynamodb:index:WriteCapacityUnits']['maximum_capacity_units']} | {v['Autoscaling']['dynamodb:index:WriteCapacityUnits']['target_utilization']}%)" if v['Autoscaling'] else str(v['ProvisionedWCU']),
f"{'[bright_red]' if v['Projection']['ProjectionType'] == 'ALL' else ''}{v['Projection']['ProjectionType']}",
v['Status'],
]
rows.append(row)
for i, column in enumerate(columns):
table.add_column(column, style=f"{'bold ' if i == 0 else ''}#a2bffe")
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
return data
def capacity_units_consumed(self, data):
self._console.print("+-------------------------+", style="#f49ac2")
self._console.print("| CAPACITY UNITS CONSUMED |", style="#f49ac2")
self._console.print("+-------------------------+", style="#f49ac2")
table = Table(title="", title_style='#FFA500', title_justify="left")
columns = ['Date','Table RCU','Table WCU'] + [f"[GSI] {i} RCU" for i in data['GSI'].keys()] + [f"[GSI] {i} WCU" for i in data['GSI'].keys()]
rows = []
for i in range(len(data['Timestamps'])):
row = [
data['Timestamps'][i].strftime('%Y-%m-%d'),
f"{(data['ConsumedReadCapacityUnits'][i] / 86400):.3f}",
f"{(data['ConsumedWriteCapacityUnits'][i] / 86400):.3f}",
]
for gsi in data['GSI'].keys():
row.extend([
f"{(data['GSI'][gsi]['ConsumedReadCapacityUnits'][i] / 86400 if i < len(data['GSI'][gsi]['ConsumedReadCapacityUnits']) else 0):.3f}",
f"{(data['GSI'][gsi]['ConsumedWriteCapacityUnits'][i] / 86400 if i < len(data['GSI'][gsi]['ConsumedReadCapacityUnits']) else 0):.3f}",
])
rows.append(row)
for i, column in enumerate(columns):
table.add_column(column, style=f"{'bold ' if i == 0 else ''}#a2bffe")
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
def capacity_units_provisioned(self, data):
def calc(dimensions, object):
capacity_units_provisioned = {}
period = 300
consumed_read_capacity_units_5m = self.__cloudwatch_request(self._cloudwatch, 'ConsumedReadCapacityUnits', dimensions, period, 'Sum', self._start_time, self._end_time)
consumed_write_capacity_units_5m = self.__cloudwatch_request(self._cloudwatch, 'ConsumedWriteCapacityUnits', dimensions, period, 'Sum', self._start_time, self._end_time)
provisioned_read_capacity_units_5m = self.__cloudwatch_request(self._cloudwatch, 'ProvisionedReadCapacityUnits', dimensions, period, 'Maximum', self._start_time, self._end_time)
provisioned_write_capacity_units_5m = self.__cloudwatch_request(self._cloudwatch, 'ProvisionedWriteCapacityUnits', dimensions, period, 'Maximum', self._start_time, self._end_time)
for i, value in enumerate(consumed_read_capacity_units_5m['MetricDataResults'][0]['Timestamps']):
if value not in capacity_units_provisioned:
capacity_units_provisioned[value] = {"ConsumedReadCapacityUnits": 0, "ConsumedWriteCapacityUnits": 0, "ProvisionedReadCapacityUnits": 0, "ProvisionedWriteCapacityUnits": 0}
capacity_units_provisioned[value]['ConsumedReadCapacityUnits'] = consumed_read_capacity_units_5m['MetricDataResults'][0]['Values'][i]
for i, value in enumerate(consumed_write_capacity_units_5m['MetricDataResults'][0]['Timestamps']):
if value not in capacity_units_provisioned:
capacity_units_provisioned[value] = {"ConsumedReadCapacityUnits": 0, "ConsumedWriteCapacityUnits": 0, "ProvisionedReadCapacityUnits": 0, "ProvisionedWriteCapacityUnits": 0}
capacity_units_provisioned[value]['ConsumedWriteCapacityUnits'] = consumed_write_capacity_units_5m['MetricDataResults'][0]['Values'][i]
for i, value in enumerate(provisioned_read_capacity_units_5m['MetricDataResults'][0]['Timestamps']):
if value not in capacity_units_provisioned:
capacity_units_provisioned[value] = {"ConsumedReadCapacityUnits": 0, "ConsumedWriteCapacityUnits": 0, "ProvisionedReadCapacityUnits": 0, "ProvisionedWriteCapacityUnits": 0}
capacity_units_provisioned[value]['ProvisionedReadCapacityUnits'] = provisioned_read_capacity_units_5m['MetricDataResults'][0]['Values'][i]
for i, value in enumerate(provisioned_write_capacity_units_5m['MetricDataResults'][0]['Timestamps']):
if value not in capacity_units_provisioned:
capacity_units_provisioned[value] = {"ConsumedReadCapacityUnits": 0, "ConsumedWriteCapacityUnits": 0, "ProvisionedReadCapacityUnits": 0, "ProvisionedWriteCapacityUnits": 0}
capacity_units_provisioned[value]['ProvisionedWriteCapacityUnits'] = provisioned_write_capacity_units_5m['MetricDataResults'][0]['Values'][i]
capacity_units_provisioned_summary = {"RCU": {}, "WCU": {}}
for i in capacity_units_provisioned.values():
# RCU
if i['ProvisionedReadCapacityUnits'] > 0:
if i['ProvisionedReadCapacityUnits'] not in capacity_units_provisioned_summary['RCU']:
capacity_units_provisioned_summary['RCU'][i['ProvisionedReadCapacityUnits']] = 1
else:
capacity_units_provisioned_summary['RCU'][i['ProvisionedReadCapacityUnits']] += 1
# WCU
if i['ProvisionedWriteCapacityUnits'] > 0:
if i['ProvisionedWriteCapacityUnits'] not in capacity_units_provisioned_summary['WCU']:
capacity_units_provisioned_summary['WCU'][i['ProvisionedWriteCapacityUnits']] = 1
else:
capacity_units_provisioned_summary['WCU'][i['ProvisionedWriteCapacityUnits']] += 1
# Order
capacity_units_provisioned_summary['RCU'] = dict(sorted(capacity_units_provisioned_summary['RCU'].items(), key=lambda x: x[1], reverse=True))
capacity_units_provisioned_summary['WCU'] = dict(sorted(capacity_units_provisioned_summary['WCU'].items(), key=lambda x: x[1], reverse=True))
# Calculate sums
capacity_units_provisioned_summary_rcu_sum = sum([i for i in capacity_units_provisioned_summary['RCU'].values()])
capacity_units_provisioned_summary_wcu_sum = sum([i for i in capacity_units_provisioned_summary['WCU'].values()])
# Show table RCU
table = Table(title=f"[{object['type']}] Provisioned RCU in the last {self._args.period} days (TOP 10).", title_style='#FFA500', title_justify="left", min_width=50)
columns = ['Name','Capacity','Percentage','Hours']
rows = []
for k, v in list(capacity_units_provisioned_summary['RCU'].items())[:10]:
row = [
object['name'],
str(k),
f"{(v/capacity_units_provisioned_summary_rcu_sum)*100:.2f}%",
str((v/capacity_units_provisioned_summary_rcu_sum) * int(self._args.period) * 24),
]
rows.append(row)
for i, column in enumerate(columns):
table.add_column(column, style=f"{'bold ' if i == 0 else ''}#a2bffe")
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Show table WCU
table = Table(title=f"[{object['type']}] Provisioned WCU in the last {self._args.period} days (TOP 10).", title_style='#FFA500', title_justify="left", min_width=50)
columns = ['Name','Capacity','Percentage','Hours']
rows = []
for k, v in list(capacity_units_provisioned_summary['WCU'].items())[:10]:
row = [
object['name'],
str(k),
f"{(v/capacity_units_provisioned_summary_wcu_sum)*100:.2f}%",
str((v/capacity_units_provisioned_summary_wcu_sum) * int(self._args.period) * 24),
]
rows.append(row)
for i, column in enumerate(columns):
table.add_column(column, style=f"{'bold ' if i == 0 else ''}#a2bffe")
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Return data
return capacity_units_provisioned, capacity_units_provisioned_summary
# Calculate Capacity Units Provisioned
cup = {
'table': {},
'GSI': {},
}
self._console.print("+----------------------------+", style="#f49ac2")
self._console.print("| PROVISIONED CAPACITY UNITS |", style="#f49ac2")
self._console.print("+----------------------------+", style="#f49ac2")
table_raw, table_agg = calc(dimensions=[{'Name': 'TableName', 'Value': self._args.table}], object={"type": 'Table', "name": self._args.table})
cup['table'] = {'raw': table_raw, 'agg': table_agg}
for gsi in data['GSI'].keys():
gsi_raw, gsi_agg = calc(dimensions=[{'Name': 'TableName', 'Value': self._args.table},{'Name': 'GlobalSecondaryIndexName', 'Value': gsi}], object={"type": 'GSI', "name": gsi})
cup['GSI'][gsi] = {'raw': gsi_raw, 'agg': gsi_agg}
# Return calculated data
return cup
def capacity_units_summary(self, summary):
cus = {
'table': {},
'gsis': {},
}
table_rcu_detailed = self.__cloudwatch_request(
self._cloudwatch,
metric_name='ConsumedReadCapacityUnits',
dimensions=[{'Name': 'TableName', 'Value': self._args.table}],
period=300,
stat='Sum',
start_time=self._start_time,
end_time=self._end_time
)
table_wcu_detailed = self.__cloudwatch_request(
self._cloudwatch,
metric_name='ConsumedWriteCapacityUnits',
dimensions=[{'Name': 'TableName', 'Value': self._args.table}],
period=300,
stat='Sum',
start_time=self._start_time,
end_time=self._end_time
)
cus['table'] = {
"min_rcu": min(table_rcu_detailed['MetricDataResults'][0]['Values']) / 300,
"min_wcu": min(table_wcu_detailed['MetricDataResults'][0]['Values']) / 300,
"max_rcu": max(table_rcu_detailed['MetricDataResults'][0]['Values']) / 300,
"max_wcu": max(table_wcu_detailed['MetricDataResults'][0]['Values']) / 300,
"avg_rcu": sum(table_rcu_detailed['MetricDataResults'][0]['Values']) / len(table_rcu_detailed['MetricDataResults'][0]['Values']) / 300,
"avg_wcu": sum(table_wcu_detailed['MetricDataResults'][0]['Values']) / len(table_wcu_detailed['MetricDataResults'][0]['Values']) / 300,
"p50_rcu": np.percentile(np.sort(table_rcu_detailed['MetricDataResults'][0]['Values']), 50) / 300,
"p50_wcu": np.percentile(np.sort(table_wcu_detailed['MetricDataResults'][0]['Values']), 50) / 300,
"p95_rcu": np.percentile(np.sort(table_rcu_detailed['MetricDataResults'][0]['Values']), 95) / 300,
"p95_wcu": np.percentile(np.sort(table_wcu_detailed['MetricDataResults'][0]['Values']), 95) / 300,
}
# Calculate GSI
for gsi in summary['GSI'].keys():
gsi_rcu_detailed = self.__cloudwatch_request(
self._cloudwatch,
metric_name='ConsumedReadCapacityUnits',
dimensions=[{'Name': 'TableName', 'Value': self._args.table}, {'Name': 'GlobalSecondaryIndexName', 'Value': gsi}],
period=300,
stat='Sum',
start_time=self._start_time,
end_time=self._end_time
)
gsi_wcu_detailed = self.__cloudwatch_request(
self._cloudwatch,
metric_name='ConsumedWriteCapacityUnits',
dimensions=[{'Name': 'TableName', 'Value': self._args.table}, {'Name': 'GlobalSecondaryIndexName', 'Value': gsi}],
period=300,
stat='Sum',
start_time=self._start_time,
end_time=self._end_time
)
cus['gsis'][gsi] = {
"min_rcu": min(gsi_rcu_detailed['MetricDataResults'][0]['Values']) / 300,
"min_wcu": min(gsi_wcu_detailed['MetricDataResults'][0]['Values']) / 300,
"max_rcu": max(gsi_rcu_detailed['MetricDataResults'][0]['Values']) / 300,
"max_wcu": max(gsi_wcu_detailed['MetricDataResults'][0]['Values']) / 300,
"avg_rcu": sum(gsi_rcu_detailed['MetricDataResults'][0]['Values']) / len(gsi_rcu_detailed['MetricDataResults'][0]['Values']) / 300,
"avg_wcu": sum(gsi_wcu_detailed['MetricDataResults'][0]['Values']) / len(gsi_wcu_detailed['MetricDataResults'][0]['Values']) / 300,
"p50_rcu": 0 if len(gsi_rcu_detailed['MetricDataResults'][0]['Values']) == 0 else np.percentile(np.sort(gsi_rcu_detailed['MetricDataResults'][0]['Values']), 50) / 300,
"p50_wcu": 0 if len(gsi_wcu_detailed['MetricDataResults'][0]['Values']) == 0 else np.percentile(np.sort(gsi_wcu_detailed['MetricDataResults'][0]['Values']), 50) / 300,
"p95_rcu": 0 if len(gsi_rcu_detailed['MetricDataResults'][0]['Values']) == 0 else np.percentile(np.sort(gsi_rcu_detailed['MetricDataResults'][0]['Values']), 95) / 300,
"p95_wcu": 0 if len(gsi_wcu_detailed['MetricDataResults'][0]['Values']) == 0 else np.percentile(np.sort(gsi_wcu_detailed['MetricDataResults'][0]['Values']), 95) / 300,
}
# Show table
table_provisioned_rcu = f"{summary['ProvisionedRCU']} RCU ({summary['Autoscaling']['dynamodb:table:ReadCapacityUnits']['minimum_capacity_units']} - {summary['Autoscaling']['dynamodb:table:ReadCapacityUnits']['maximum_capacity_units']} | {summary['Autoscaling']['dynamodb:table:ReadCapacityUnits']['target_utilization']}%)" if summary['Autoscaling'] else f"{summary['ProvisionedRCU']} RCU"
table_provisioned_wcu = f"{summary['ProvisionedWCU']} WCU ({summary['Autoscaling']['dynamodb:table:WriteCapacityUnits']['minimum_capacity_units']} - {summary['Autoscaling']['dynamodb:table:WriteCapacityUnits']['maximum_capacity_units']} | {summary['Autoscaling']['dynamodb:table:WriteCapacityUnits']['target_utilization']}%)" if summary['Autoscaling'] else f"{summary['ProvisionedWCU']} WCU"
table = Table(title="", title_style='#FFA500', title_justify="left")
columns = ['Name','Min','Med','Avg','P95','Max','Provisioned']
rows = [[
f"[Table] {self._args.table}",
f"{cus['table']['min_rcu']:.3f} RCU, {cus['table']['min_wcu']:.3f} WCU",
f"{cus['table']['p50_rcu']:.3f} RCU, {cus['table']['p50_wcu']:.3f} WCU",
f"{cus['table']['avg_rcu']:.3f} RCU, {cus['table']['avg_wcu']:.3f} WCU",
f"{cus['table']['p95_rcu']:.3f} RCU, {cus['table']['p95_wcu']:.3f} WCU",
f"{cus['table']['max_rcu']:.3f} RCU, {cus['table']['max_wcu']:.3f} WCU",
f"{table_provisioned_rcu}, {table_provisioned_wcu}",
]]
for k, v in cus['gsis'].items():
gsi_provisioned_rcu = f"{summary['GSI'][k]['ProvisionedRCU']} RCU ({summary['GSI'][k]['Autoscaling']['dynamodb:index:ReadCapacityUnits']['minimum_capacity_units']} - {summary['GSI'][k]['Autoscaling']['dynamodb:index:ReadCapacityUnits']['maximum_capacity_units']} | {summary['GSI'][k]['Autoscaling']['dynamodb:index:ReadCapacityUnits']['target_utilization']}%)" if summary['GSI'][k]['Autoscaling'] else f"{summary['GSI'][k]['ProvisionedRCU']} RCU"
gsi_provisioned_wcu = f"{summary['GSI'][k]['ProvisionedWCU']} WCU ({summary['GSI'][k]['Autoscaling']['dynamodb:index:WriteCapacityUnits']['minimum_capacity_units']} - {summary['GSI'][k]['Autoscaling']['dynamodb:index:WriteCapacityUnits']['maximum_capacity_units']} | {summary['GSI'][k]['Autoscaling']['dynamodb:index:WriteCapacityUnits']['target_utilization']}%)" if summary['GSI'][k]['Autoscaling'] else f"{summary['GSI'][k]['ProvisionedWCU']} WCU"
row = [
f"[GSI] {k}",
f"{v['min_rcu']:.3f} RCU, {v['min_wcu']:.3f} WCU",
f"{v['p50_rcu']:.3f} RCU, {v['p50_wcu']:.3f} WCU",
f"{v['avg_rcu']:.3f} RCU, {v['avg_wcu']:.3f} WCU",
f"{v['p95_rcu']:.3f} RCU, {v['p95_wcu']:.3f} WCU",
f"{v['max_rcu']:.3f} RCU, {v['max_wcu']:.3f} WCU",
f"{gsi_provisioned_rcu}, {gsi_provisioned_wcu}",
]
rows.append(row)
for i, column in enumerate(columns):
table.add_column(column, style=f"{'bold ' if i == 0 else ''}#a2bffe")
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Return data
return cus
def throttling(self, summary, cup):
def calc(dimensions):
data = {'Read': {}, 'Write': {}}
start_time = datetime.fromisoformat(self._start_time).replace(hour=0, minute=0, second=0, microsecond=0)
end_time = datetime.fromisoformat(self._end_time)
read_throttle_events_1d = self.__cloudwatch_request(self._cloudwatch, 'ReadThrottleEvents', dimensions, 86400, 'Sum', self._start_time, self._end_time)
write_throttle_events_1d = self.__cloudwatch_request(self._cloudwatch, 'WriteThrottleEvents', dimensions, 86400, 'Sum', self._start_time, self._end_time)
read_timestamps = [i.strftime('%Y-%m-%d') for i in read_throttle_events_1d['MetricDataResults'][0]['Timestamps']]
read_values = read_throttle_events_1d['MetricDataResults'][0]['Values']
write_timestamps = [i.strftime('%Y-%m-%d') for i in write_throttle_events_1d['MetricDataResults'][0]['Timestamps']]
write_values = write_throttle_events_1d['MetricDataResults'][0]['Values']
while start_time < end_time:
# Calculate Throttling Reads
try:
data['Read'][start_time.strftime('%Y-%m-%d')] = read_values[read_timestamps.index(start_time.strftime('%Y-%m-%d'))]
except ValueError:
data['Read'][start_time.strftime('%Y-%m-%d')] = 0
# Calculate Throttling Writes
try:
data['Write'][start_time.strftime('%Y-%m-%d')] = write_values[write_timestamps.index(start_time.strftime('%Y-%m-%d'))]
except ValueError:
data['Write'][start_time.strftime('%Y-%m-%d')] = 0
# Iterate one more day
start_time += timedelta(hours=24)
return data
# Calculate Throttling
self._console.print("+------------+", style="#f49ac2")
self._console.print("| THROTTLING |", style="#f49ac2")
self._console.print("+------------+", style="#f49ac2")
throttling = {
'table': calc(dimensions=[{'Name': 'TableName', 'Value': self._args.table}]),
'GSI': {},
}
for gsi in summary['GSI'].keys():
throttling['GSI'][gsi] = calc(dimensions=[{'Name': 'TableName', 'Value': self._args.table},{'Name': 'GlobalSecondaryIndexName', 'Value': gsi}])
# Print Throttling
table = Table(title="", title_style='#FFA500', title_justify="left", show_lines=True)
columns = ['Date','Table'] + [f"[GSI] {i}" for i in summary['GSI'].keys()]
for i, column in enumerate(columns):
table.add_column(column, style='bold #a2bffe' if i == 0 else '')
rows = []
for date in throttling['table']['Read'].keys():
table_row = [date]
for element in [throttling['table']] + list(throttling['GSI'].values()):
table_reads_writes = Table(title="", title_style='#FFA500', title_justify="left")
table_reads_writes_columns = ['Reads', 'Writes']
table_reads_writes_rows = [[
f"{'[bright_red]' if int(element['Read'][date]) > 0 else '[bright_green]'}{int(element['Read'][date])}",
f"{'[bright_red]' if int(element['Write'][date]) > 0 else '[bright_green]'}{int(element['Write'][date])}"
]]
for column in table_reads_writes_columns:
table_reads_writes.add_column(column)
for row in table_reads_writes_rows:
table_reads_writes.add_row(*row)
table_row.append(table_reads_writes)
rows.append(table_row)
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Print Tips table
table = Table(title="", title_style='#FFA500', title_justify="left")
table.add_column('💡 Tips')
rows = [
["If thottling is greater than 0:"],
["- Increase the provisioned Capacity Units."],
["- Check 'ReadThrottleEvents' and 'WriteThrottleEvents' CloudWatch metrics to see if it happens regularly or sporadically."],
["- Check if there is a hot partition key issue. Limits: 3000 RCU and 1000 WCU."],
["- Enable 'Contributor Insights' for this DynamoDB table to get the most accessed items and most throttled keys."],
["-- DynamoDB --> Tables --> Update Settings --> Monitor --> Turn on CloudWatch Contributor Insights"],
["-- Cloudwatch --> Contributor Insights"],
]
for row in rows:
table.add_row(*row)
console = Console()
console.print(table)
# Return calculated data
return throttling
def cost_comparison(self, summary, cus, cup):
def get_aws_pricing():
params = {
"ServiceCode": "AmazonDynamoDB",
"Filters": [
{
'Type': 'TERM_MATCH',
'Field': 'regionCode',
'Value': self._args.region,
},
]
}
pricing = []
# Get pricing
while True:
response = self._pricing.get_products(**params)['PriceList']
for i in response:
data = json.loads(i)
pricing.append({
"usagetype": data['product']['attributes']['usagetype'],
"group": data['product']['attributes'].get('group'),
"operation": data['product']['attributes']['operation'],
"modes": list(data['terms'].keys()),
"unit": list(list(list(data['terms'].values())[0].values())[0]['priceDimensions'].values())[0]['unit'],
"price": {mode: max([j['pricePerUnit'] for i in value.values() for j in i['priceDimensions'].values()], key=lambda x: x.get('USD', 0))['USD'] for mode, value in data['terms'].items()}
})
if 'NextToken' not in response:
break
params['NextToken'] = response['NextToken']
# Parse pricing
aws_pricing = {
"on-demand": {
"standard": {
"wru": Decimal(next(filter(lambda x: x['group'] == 'DDB-WriteUnits' and x['operation'] == 'PayPerRequestThroughput', pricing))['price']['OnDemand']),
"rru": Decimal(next(filter(lambda x: x['group'] == 'DDB-ReadUnits' and x['operation'] == 'PayPerRequestThroughput', pricing))['price']['OnDemand']),
"storage": Decimal(next(filter(lambda x: x['usagetype'].endswith('TimedStorage-ByteHrs') and not x['usagetype'].endswith('IA-TimedStorage-ByteHrs'), pricing))['price']['OnDemand']),
},
"infrequent": {
"wru": Decimal(next(filter(lambda x: x['group'] == 'DDB-WriteUnitsIA' and x['operation'] == 'PayPerRequestThroughput', pricing))['price']['OnDemand']),
"rru": Decimal(next(filter(lambda x: x['group'] == 'DDB-ReadUnitsIA' and x['operation'] == 'PayPerRequestThroughput', pricing))['price']['OnDemand']),
"storage": Decimal(next(filter(lambda x: x['usagetype'].endswith('IA-TimedStorage-ByteHrs'), pricing))['price']['OnDemand']),
}
},
"provisioned": {
"standard": {
"wcu": Decimal(next(filter(lambda x: x['group'] == 'DDB-WriteUnits' and x['operation'] == 'CommittedThroughput', pricing))['price']['OnDemand']),
"rcu": Decimal(next(filter(lambda x: x['group'] == 'DDB-ReadUnits' and x['operation'] == 'CommittedThroughput', pricing))['price']['OnDemand']),
"storage": Decimal(next(filter(lambda x: x['usagetype'].endswith('TimedStorage-ByteHrs') and not x['usagetype'].endswith('IA-TimedStorage-ByteHrs'), pricing))['price']['OnDemand']),
},
"infrequent": {
"wcu": Decimal(next(filter(lambda x: x['group'] == 'DDB-WriteUnitsIA' and x['operation'] == 'CommittedThroughput', pricing))['price']['OnDemand']),
"rcu": Decimal(next(filter(lambda x: x['group'] == 'DDB-ReadUnitsIA' and x['operation'] == 'CommittedThroughput', pricing))['price']['OnDemand']),
"storage": Decimal(next(filter(lambda x: x['usagetype'].endswith('IA-TimedStorage-ByteHrs'), pricing))['price']['OnDemand']),
}
}
}
return aws_pricing
def calc(aws_pricing, element):
costs = {
"current": {
"standard": {"rcu": 0, "wcu": 0},
"infrequent": {"rcu": 0, "wcu": 0},
},
"on-demand": {
"standard": {"rcu": 0, "wcu": 0},
"infrequent": {"rcu": 0, "wcu": 0},
},
"provisioned": {
"standard": {"rcu": 0, "wcu": 0},
"infrequent": {"rcu": 0, "wcu": 0},
},
"capacity": {
"rcu": {"min": float('inf'), "max": float('-inf') },
"wcu": {"min": float('inf'), "max": float('-inf') },
}
}
for v in element.values():
# Current - RCU
if v['ProvisionedReadCapacityUnits'] > 0:
costs['current']['standard']['rcu'] += aws_pricing['provisioned']['standard']['rcu'] * Decimal(v['ProvisionedReadCapacityUnits']) * 5/60
costs['current']['infrequent']['rcu'] += aws_pricing['provisioned']['infrequent']['rcu'] * Decimal(v['ProvisionedReadCapacityUnits']) * 5/60
else:
costs['current']['standard']['rcu'] += aws_pricing['on-demand']['standard']['rru'] * 2 * Decimal(v['ConsumedReadCapacityUnits'])
costs['current']['infrequent']['rcu'] += aws_pricing['on-demand']['infrequent']['rru'] * 2 * Decimal(v['ConsumedReadCapacityUnits'])
# Current - WCU
if v['ProvisionedWriteCapacityUnits'] > 0:
costs['current']['standard']['wcu'] += aws_pricing['provisioned']['standard']['wcu'] * Decimal(v['ProvisionedWriteCapacityUnits']) * 5/60
costs['current']['infrequent']['wcu'] += aws_pricing['provisioned']['infrequent']['wcu'] * Decimal(v['ProvisionedWriteCapacityUnits']) * 5/60
else:
costs['current']['standard']['wcu'] += aws_pricing['on-demand']['standard']['wru'] * Decimal(v['ConsumedWriteCapacityUnits'])
costs['current']['infrequent']['wcu'] += aws_pricing['on-demand']['infrequent']['wru'] * Decimal(v['ConsumedWriteCapacityUnits'])
# On-Demand
costs['on-demand']['standard']['rcu'] += aws_pricing['on-demand']['standard']['rru'] * 2 * Decimal(v['ConsumedReadCapacityUnits'])
costs['on-demand']['infrequent']['rcu'] += aws_pricing['on-demand']['infrequent']['rru'] * 2 * Decimal(v['ConsumedReadCapacityUnits'])
costs['on-demand']['standard']['wcu'] += aws_pricing['on-demand']['standard']['wru'] * Decimal(v['ConsumedWriteCapacityUnits'])
costs['on-demand']['infrequent']['wcu'] += aws_pricing['on-demand']['infrequent']['wru'] * Decimal(v['ConsumedWriteCapacityUnits'])
# Provisioned - RCU
if v['ProvisionedReadCapacityUnits'] > 0:
costs['provisioned']['standard']['rcu'] += aws_pricing['provisioned']['standard']['rcu'] * Decimal(v['ProvisionedReadCapacityUnits']) * 5/60
costs['provisioned']['infrequent']['rcu'] += aws_pricing['provisioned']['infrequent']['rcu'] * Decimal(v['ProvisionedReadCapacityUnits']) * 5/60
else:
costs['provisioned']['standard']['rcu'] += aws_pricing['provisioned']['standard']['rcu'] * max(Decimal(v['ConsumedReadCapacityUnits'])/300, 1) * 5/60
costs['provisioned']['infrequent']['rcu'] += aws_pricing['provisioned']['infrequent']['rcu'] * max(Decimal(v['ConsumedReadCapacityUnits'])/300, 1) * 5/60
# Provisioned - WCU
if v['ProvisionedWriteCapacityUnits'] > 0:
costs['provisioned']['standard']['wcu'] += aws_pricing['provisioned']['standard']['wcu'] * Decimal(v['ProvisionedWriteCapacityUnits']) * 5/60
costs['provisioned']['infrequent']['wcu'] += aws_pricing['provisioned']['infrequent']['wcu'] * Decimal(v['ProvisionedWriteCapacityUnits']) * 5/60
else:
costs['provisioned']['standard']['wcu'] += aws_pricing['provisioned']['standard']['wcu'] * max(Decimal(v['ConsumedWriteCapacityUnits'])/300, 1) * 5/60
costs['provisioned']['infrequent']['wcu'] += aws_pricing['provisioned']['infrequent']['wcu'] * max(Decimal(v['ConsumedWriteCapacityUnits'])/300, 1) * 5/60
# Calculate min and max capacity
costs['capacity'] = {
"rcu": {
"min": min(costs['capacity']['rcu']['min'], v['ProvisionedReadCapacityUnits']),
"max": max(costs['capacity']['rcu']['max'], v['ProvisionedReadCapacityUnits']),
},
"wcu": {
"min": min(costs['capacity']['rcu']['min'], v['ProvisionedWriteCapacityUnits']),
"max": max(costs['capacity']['rcu']['max'], v['ProvisionedWriteCapacityUnits']),
},
}
return costs
def build_table_1(table, costs, object, name):
optimal = []
if costs['on-demand']['standard']['rcu'] + costs['on-demand']['standard']['wcu'] <= costs['provisioned']['standard']['rcu'] + costs['provisioned']['standard']['wcu']:
optimal.append('on-demand')
elif costs['on-demand']['standard']['rcu'] + costs['on-demand']['standard']['wcu'] >= costs['provisioned']['standard']['rcu'] + costs['provisioned']['standard']['wcu']:
optimal.append('provisioned')
table_current = Table(title="", title_style='#FFA500', title_justify="left")
table_current_columns = ['Reads', 'Writes']
table_current_rows = [[
f"${costs['current']['standard']['rcu']:.4f}",
f"${costs['current']['standard']['wcu']:.4f}",
]]
for column in table_current_columns:
table_current.add_column(column)
for row in table_current_rows:
table_current.add_row(*row)
table_ondemand = Table(title="", title_style='#FFA500', title_justify="left")
table_ondemand_rows = [[
f"${costs['on-demand']['standard']['rcu']:.4f}",
f"${costs['on-demand']['standard']['wcu']:.4f}",
]]
for column in table_current_columns:
table_ondemand.add_column(column, style='bright_green' if 'on-demand' in optimal else '')
for row in table_ondemand_rows:
table_ondemand.add_row(*row)
table_provisioned = Table(title="", title_style='#FFA500', title_justify="left")
table_provisioned_reads_min = max(cus['table']['min_rcu'], 1) if object == 'table' else max(cus['gsis'][name]['min_rcu'], 1)
table_provisioned_reads_max = max(min(cus['table']['max_rcu'], max(cup['table']['agg']['RCU'], default=cus['table']['max_rcu'])), 1) if object == 'table' else max(min(cus['gsis'][name]['max_rcu'], max(cup['GSI'][name]['agg']['RCU'], default=cus['gsis'][name]['max_rcu'])), 1)
table_provisioned_writes_min = max(cus['table']['min_wcu'], 1) if object == 'table' else max(cus['gsis'][name]['min_wcu'], 1)
table_provisioned_writes_max = max(min(cus['table']['max_wcu'], max(cup['table']['agg']['WCU'], default=cus['table']['max_wcu'])), 1) if object == 'table' else max(min(cus['gsis'][name]['max_wcu'], max(cup['GSI'][name]['agg']['WCU'], default=cus['gsis'][name]['max_wcu'])), 1)
table_provisioned_columns = [
f"Reads ({int(table_provisioned_reads_min)} - {int(table_provisioned_reads_max)})",
f"Writes ({int(table_provisioned_writes_min)} - {int(table_provisioned_writes_max)})",
]
table_provisioned_rows = [[
f"${costs['provisioned']['standard']['rcu']:.4f}",
f"${costs['provisioned']['standard']['wcu']:.4f}",
]]
for column in table_provisioned_columns:
table_provisioned.add_column(column, style='bright_green' if 'provisioned' in optimal else '')
for row in table_provisioned_rows:
table_provisioned.add_row(*row)
# Print table
rows = [[
f"[{'Table' if object == 'table' else 'GSI'}] {name}",
table_current,
table_ondemand,
table_provisioned,
]]
for row in rows:
table.add_row(*row)
return table
def build_table_2(aws_pricing, table, costs):
capacity = {
'current': {
'standard': costs['table']['current']['standard']['rcu'] + costs['table']['current']['standard']['wcu'] + sum([i['current']['standard']['rcu'] + i['current']['standard']['wcu'] for i in costs['gsi'].values()]),
'infrequent': costs['table']['current']['infrequent']['rcu'] + costs['table']['current']['infrequent']['wcu'] + sum([i['current']['infrequent']['rcu'] + i['current']['infrequent']['wcu'] for i in costs['gsi'].values()]),
},
'ondemand': {
'standard': costs['table']['on-demand']['standard']['rcu'] + costs['table']['on-demand']['standard']['wcu'] + sum([i['on-demand']['standard']['rcu'] + i['on-demand']['standard']['wcu'] for i in costs['gsi'].values()]),
'infrequent': costs['table']['on-demand']['infrequent']['rcu'] + costs['table']['on-demand']['infrequent']['wcu'] + sum([i['on-demand']['infrequent']['rcu'] + i['on-demand']['infrequent']['wcu'] for i in costs['gsi'].values()]),
},
'provisioned': {
'standard': costs['table']['provisioned']['standard']['rcu'] + costs['table']['provisioned']['standard']['wcu'] + sum([i['provisioned']['standard']['rcu'] + i['provisioned']['standard']['wcu'] for i in costs['gsi'].values()]),
'infrequent': costs['table']['provisioned']['infrequent']['rcu'] + costs['table']['provisioned']['infrequent']['wcu'] + sum([i['provisioned']['infrequent']['rcu'] + i['provisioned']['infrequent']['wcu'] for i in costs['gsi'].values()]),
},
}
# Calculate Capacity
table_current = Table(title="", title_justify="left")
table_current_columns = ['Standard', 'Infrequent']
table_current_rows = [[
f"${capacity['current']['standard']:.4f}",
f"${capacity['current']['infrequent']:.4f}",
]]
for column in table_current_columns:
table_current.add_column(column)
for row in table_current_rows:
table_current.add_row(*row)
table_ondemand = Table(title="", title_justify="left")
table_ondemand_rows = [[
f"${capacity['ondemand']['standard']:.4f}",
f"${capacity['ondemand']['infrequent']:.4f}",
]]
for column in table_current_columns:
table_ondemand.add_column(column)
for row in table_ondemand_rows:
table_ondemand.add_row(*row)
table_provisioned = Table(title="", title_justify="left")
table_provisioned_rows = [[
f"${capacity['provisioned']['standard']:.4f}",
f"${capacity['provisioned']['infrequent']:.4f}",
]]
for column in table_current_columns:
table_provisioned.add_column(column)
for row in table_provisioned_rows:
table_provisioned.add_row(*row)
# Add rows to the table
rows = [[
"Capacity Units",
table_current,
table_ondemand,
table_provisioned,
]]
for row in rows:
table.add_row(*row)
# Calculate Storage
storage_standard = Decimal(summary['TableSizeBytes']) / (1024 ** 3) * aws_pricing['on-demand']['standard']['storage']
storage_infrequent = Decimal(summary['TableSizeBytes']) / (1024 ** 3) * aws_pricing['on-demand']['infrequent']['storage']
table_current = Table(title="", title_justify="left")
table_current_columns = ['Standard', 'Infrequent']
table_current_rows = [[
f"${(storage_standard):.4f}",
f"${(storage_infrequent):.4f}",
]]
for column in table_current_columns:
table_current.add_column(column)
for row in table_current_rows:
table_current.add_row(*row)
table_ondemand = Table(title="", title_justify="left")
table_ondemand_rows = [[
f"${(storage_standard):.4f}",
f"${(storage_infrequent):.4f}",
]]
for column in table_current_columns:
table_ondemand.add_column(column)
for row in table_ondemand_rows:
table_ondemand.add_row(*row)
table_provisioned = Table(title="", title_justify="left")
table_provisioned_rows = [[
f"${(storage_standard):.4f}",
f"${(storage_infrequent):.4f}",
]]
for column in table_current_columns:
table_provisioned.add_column(column)
for row in table_provisioned_rows:
table_provisioned.add_row(*row)
# Add rows to the tabñe
rows = [[
"Storage",
table_current,
table_ondemand,
table_provisioned,
]]
for row in rows:
table.add_row(*row)
# Calculate Total
total = {
'current': {
'standard': capacity['current']['standard'] + storage_standard,
'infrequent': capacity['current']['infrequent'] + storage_infrequent,
},
'ondemand': {
'standard': capacity['ondemand']['standard'] + storage_standard,
'infrequent': capacity['ondemand']['infrequent'] + storage_infrequent,
},
'provisioned': {
'standard': capacity['provisioned']['standard'] + storage_standard,
'infrequent': capacity['provisioned']['infrequent'] + storage_infrequent,
}
}
min_keys, _ = min(
(((key1, key2), value2) for key1, value1 in total.items() if key1 != 'current' for key2, value2 in value1.items()),
key=lambda x: x[1]
)
table_current = Table(title="", title_justify="left")
table_current_columns = ['Standard', 'Infrequent']
table_current_rows = [[
f"${total['current']['standard']:.4f}",
f"${total['current']['infrequent']:.4f}",
]]
for column in table_current_columns:
table_current.add_column(column)
for row in table_current_rows:
table_current.add_row(*row)
table_ondemand = Table(title="", title_justify="left")
table_ondemand_rows = [[
f"${total['ondemand']['standard']:.4f}",
f"${total['ondemand']['infrequent']:.4f}",
]]
for column in table_current_columns:
color = 'bright_green' if min_keys[0] == 'ondemand' and column.lower() == min_keys[1] else ''
table_ondemand.add_column(column, style=color, header_style=color)
for row in table_ondemand_rows:
table_ondemand.add_row(*row)
table_provisioned = Table(title="", title_justify="left")
table_provisioned_rows = [[
f"${total['provisioned']['standard']:.4f}",
f"${total['provisioned']['infrequent']:.4f}",
]]
for column in table_current_columns:
color = 'bright_green' if min_keys[0] == 'provisioned' and column.lower() == min_keys[1] else ''
table_provisioned.add_column(column, style=color, header_style=color)
for row in table_provisioned_rows:
table_provisioned.add_row(*row)
# Add rows to the tabñe
rows = [[
"Total Price",
table_current,
table_ondemand,
table_provisioned,
]]
for row in rows:
table.add_row(*row)
# Return table
return table
self._console.print("+-------+", style="#f49ac2")
self._console.print("| COSTS |", style="#f49ac2")
self._console.print("+-------+", style="#f49ac2")
# Get Pricing
aws_pricing = get_aws_pricing()
# Calculate Costs
costs = {
'table': calc(aws_pricing, cup['table']['raw']),
'gsi': {}
}
for gsi in cup['GSI'].keys():
costs['gsi'][gsi] = calc(aws_pricing, cup['GSI'][gsi]['raw'])
# Build Table - Cost comparison
table = Table(title=f"- Estimation Cost for Tables and GSIs over the last {self._args.period} days.", title_style='#FFA500', title_justify="left", show_lines=True)
columns = ['Name', '⚡️ Current', '🚀 On-Demand', f"✅ Provisioned"]
for i, column in enumerate(columns):
table.add_column(column, style='bold #a2bffe' if i == 0 else '')
table = build_table_1(table, costs['table'], 'table', self._args.table)
for k, v in costs['gsi'].items():
table = build_table_1(table, v, 'gsi', k)
# Print Table - Cost comparison
console = Console()
console.print(table)
# Build Table - Standard vs Infrequent
table = Table(title=f"- Estimation Cost over the last {self._args.period} days.", title_style='#FFA500', title_justify="left", show_lines=True)
columns = ['Name', '⚡️ Current', '🚀 On-Demand', f"✅ Provisioned"]
for i, column in enumerate(columns):
table.add_column(column, style='bold #a2bffe' if i == 0 else '')
table = build_table_2(aws_pricing, table, costs)
# Print Table - Standard vs Infrequent
console = Console()
console.print(table)
####################
# Internal Methods #
####################
def __cloudwatch_request(self, cloudwatch, metric_name, dimensions, period, stat, start_time, end_time):
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'm1',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/DynamoDB',
'MetricName': metric_name,
'Dimensions': dimensions
},
'Period': period,
'Stat': stat,
},
'ReturnData': True
},
],
StartTime=start_time,
EndTime=end_time
)
return response
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment