Skip to content

Instantly share code, notes, and snippets.

@betamoo
Created January 22, 2019 18:58
Show Gist options
  • Save betamoo/ae8cff1022c97bce6718db06bb9d06fd to your computer and use it in GitHub Desktop.
Save betamoo/ae8cff1022c97bce6718db06bb9d06fd to your computer and use it in GitHub Desktop.
import boto3
import datetime
from datetime import datetime, timedelta
dynamodb = boto3.client('dynamodb', region_name='us-east-1')
cloudwatch = boto3.client('cloudwatch', region_name='us-east-1')
def getCapacities(table_name):
response = dynamodb.describe_table(TableName=table_name)
capacity = response['Table']['ProvisionedThroughput']
return capacity['ReadCapacityUnits'], capacity['WriteCapacityUnits']
def get_consumed_read_capacities(table_name):
return max([_get_consumed_read_capacities(table_name, delta) for delta in range(0,8)])
def _get_consumed_read_capacities(table_name, delta = 0):
time_now = datetime.utcnow()-timedelta(days=delta)
data = cloudwatch.get_metric_statistics(Period=60, StartTime=time_now - timedelta(days=1), EndTime=time_now, MetricName='ConsumedReadCapacityUnits', Namespace='AWS/DynamoDB', Statistics=['Sum'], Dimensions=[{'Name': "TableName", "Value":table_name}])
return max([x['Sum']/(60.0) for x in data['Datapoints']] + [0])
def get_consumed_write_capacities(table_name):
return max([_get_consumed_write_capacities(table_name, delta) for delta in range(0,8)])
def _get_consumed_write_capacities(table_name, delta = 0):
time_now = datetime.utcnow()-timedelta(days=delta)
data = cloudwatch.get_metric_statistics(Period=60, StartTime=time_now - timedelta(days=1), EndTime=time_now, MetricName='ConsumedWriteCapacityUnits', Namespace='AWS/DynamoDB', Statistics=['Sum'], Dimensions=[{'Name': "TableName", "Value":table_name}])
return max([x['Sum']/(60.0) for x in data['Datapoints']] + [0])
def list_table_names():
last_evaluated_table = None
while True:
if last_evaluated_table:
tables_list = dynamodb.list_tables(ExclusiveStartTableName=last_evaluated_table)
else:
tables_list = dynamodb.list_tables()
last_evaluated_table = tables_list.get('LastEvaluatedTableName', None)
table_names = tables_list.get('TableNames', [])
for table_name in table_names:
yield table_name
if not table_names or not last_evaluated_table:
return
return
def is_staging(table_name):
return 'staging' in table_name
def is_production(table_name):
return 'production' in table_name
tables_services = {}
consumed_tables = {}
for t in list_table_names():
if 'production' in t:
service_name = t.split('-')[0]
if service_name not in tables_services:
tables_services[service_name] = set()
tables_services[service_name].add(t)
consumed_tables[t]=get_consumed_read_capacities(t)
all_tables = set()
tables_55 = set()
tables_60 = set()
tables_70 = set()
tables_80 = set()
info = {}
def sort_tables():
i = 0
for table_name in list_table_names():
i += 1
all_tables.add(table_name)
if i%100 == 0:
print("{} tables processed, uniqu:{}".format(i, len(all_tables)))
service_name = table_name.split('-')[0]
if 'staging' in table_name:
continue
if not is_production(table_name):
continue
read_cap = get_consumed_read_capacities(table_name)
write_cap = get_consumed_write_capacities(table_name)
read_prov, write_prov = getCapacities(table_name)
info[table_name] ={ "prov_read_write":(read_prov, write_prov), "cons_read_write":(read_cap, write_cap)}
if read_cap/read_prov <= 0.55 and write_cap/write_prov<=0.55:
tables_55.add(table_name)
elif read_cap/read_prov <= 0.6 and write_cap/write_prov<=0.6:
tables_60.add(table_name)
elif read_cap / read_prov <= 0.7 and write_cap / write_prov <= 0.7:
tables_70.add(table_name)
else:
tables_80.add(table_name)
sort_tables()
def get_app(name):
read = info[name]['cons_read_write'][0]
write = info[name]['cons_read_write'][1]
new_read = int(read * 100/(55*200))*200
new_write = int(write * 100/(55*200))*200
return info[name]['prov_read_write'][0], max(new_read, info[name]['prov_read_write'][0]), info[name]['prov_read_write'][1], max(new_write, info[name]['prov_read_write'][1])
x = sorted(list(tables_60) + list(tables_70) + list(tables_80))
for t in x:
ttt = get_app(t)
if ttt[0] != ttt[1] or ttt[2] != ttt[3]:
print("{}, {} -> {}, {} -> {}".format(t, *ttt))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment