Skip to content

Instantly share code, notes, and snippets.

@polius
Created May 13, 2024 09:19
Show Gist options
  • Save polius/b09fc0453be1b3ed7e5f2199be66e9a5 to your computer and use it in GitHub Desktop.
Save polius/b09fc0453be1b3ed7e5f2199be66e9a5 to your computer and use it in GitHub Desktop.
Generate a CSV file listing all EC2 volumes in an AWS account, along with some cost-saving recommendations.
import csv
import boto3
import numpy as np
import argparse
import threading
from datetime import datetime, timedelta, timezone
class main:
def __init__(self):
# Grab parameters
parser = argparse.ArgumentParser()
parser.add_argument('--profile', required=False, help='AWS profile name ($ aws configure --profile customer)')
args = parser.parse_args()
# Get the timestamp range
self._end_time = datetime.now(timezone.utc).replace(second=0, microsecond=0)
self._start_time = (self._end_time - timedelta(days=30)).replace(hour=0, minute=0, second=0, microsecond=0)
# Create a session with the specified profile
self._session = boto3.Session(profile_name=args.profile)
# Start computing
self.compute()
# Show confirmation message
print("Scan completed")
def compute(self):
# Define headers
headers = ['Region','ID','Type','State','Size','Iops','Throughput','Min_IOPS','Avg_IOPS','P95_IOPS','Max_IOPS','Min_Throughput','Avg_Throughput','P95_Throughput','Max_Throughput','Is_Optimal','Optimal_Type','Optimal_IOPS','Optimal_Throughput_MB']
# Write headers into file
with open('data.csv', 'w') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(headers)
# Get regions
regions = [region['RegionName'] for region in self._session.client('ec2', region_name='us-east-1').describe_regions()['Regions']]
for region in regions:
# Create boto3 clients
ec2_client = self._session.client('ec2', region_name=region)
cloudwatch_client = self._session.client('cloudwatch', region_name=region)
# Retrieve the list of volumes
volumes = ec2_client.describe_volumes()
# Compute data
data = []
# Iterate over each volume
for i, volume in enumerate(volumes['Volumes']):
print(f"[{region}] [{i+1}/{len(volumes['Volumes'])}] {volume['VolumeId']} | Type: {volume['VolumeType']} | State: {volume['State']} | Size: {volume['Size']} | Iops: {volume['Iops'] if 'Iops' in volume else '-'} | Throughput: {volume['Throughput'] if 'Throughput' in volume else '-'}")
thread_data = {}
threads = [
threading.Thread(target=self.__cloudwatch_request, args=(thread_data, cloudwatch_client, 'VolumeReadOps', volume['VolumeId'],)),
threading.Thread(target=self.__cloudwatch_request, args=(thread_data, cloudwatch_client, 'VolumeWriteOps', volume['VolumeId'],)),
threading.Thread(target=self.__cloudwatch_request, args=(thread_data, cloudwatch_client, 'VolumeReadBytes', volume['VolumeId'],)),
threading.Thread(target=self.__cloudwatch_request, args=(thread_data, cloudwatch_client, 'VolumeWriteBytes', volume['VolumeId'],)),
]
for t in threads:
t.start()
for t in threads:
t.join()
read_ops = thread_data['VolumeReadOps']
write_ops = thread_data['VolumeWriteOps']
read_throughput = thread_data['VolumeReadBytes']
write_throughput = thread_data['VolumeWriteBytes']
overall_iops = [(x + y)/300 for x, y in zip(read_ops, write_ops)]
overall_throughput = [(x + y)/300 for x, y in zip(read_throughput, write_throughput)]
min_iops = round(min(overall_iops)) if len(overall_iops) > 0 else 0
avg_iops = round(sum(overall_iops) / len(overall_iops)) if len(overall_iops) > 0 else 0
p95_iops = round(np.percentile(np.sort(overall_iops), 95)) if len(overall_iops) > 0 else 0
max_iops = round(max(overall_iops)) if len(overall_iops) > 0 else 0
min_throughput = round(min(overall_throughput) / 1024**2) if len(overall_throughput) > 0 else 0
avg_throughput = round(sum(overall_throughput) / len(overall_throughput) / 1024**2) if len(overall_throughput) > 0 else 0
p95_throughput = round(np.percentile(np.sort(overall_throughput), 95) / 1024**2) if len(overall_throughput) > 0 else 0
max_throughput = round(max(overall_throughput) / 1024**2) if len(overall_throughput) > 0 else 0
is_optimal = 1
optimal_type = 'gp3'
optimal_iops = 3000
optimal_throughput = 125
if volume['VolumeType'] == 'gp2':
is_optimal = 0
optimal_iops = max(3000, p95_iops)
optimal_throughput = max(125, p95_throughput)
elif volume['VolumeType'] == 'gp3':
# Check over-provisioned
if volume['Iops'] > 3000 and p95_iops < 3000:
is_optimal = 0
if volume['Throughput'] > 125 and p95_throughput < 125:
is_optimal = 0
# Check under-provisioned
if p95_iops > volume['Iops']:
is_optimal = 0
optimal_iops = max(p95_iops, 3000)
if p95_throughput > volume['Throughput']:
is_optimal = 0
optimal_throughput = max(p95_throughput, 125)
elif volume['VolumeType'] in ['io1','io2']:
# Check if gp3 would be better
if p95_iops <= 16000:
is_optimal = 0
optimal_iops = max(3000, p95_iops)
optimal_throughput = max(125, p95_throughput)
else:
# Check over-provisioned
if volume['Iops'] > 3000 and p95_iops < 3000:
is_optimal = 0
if volume['Throughput'] > 125 and p95_throughput < 125:
is_optimal = 0
# Check under-provisioned
if p95_iops > volume['Iops']:
is_optimal = 0
optimal_iops = max(p95_iops, 3000)
if p95_throughput > volume['Throughput']:
is_optimal = 0
optimal_throughput = max(p95_throughput, 125)
data.append([
region,
volume['VolumeId'],
volume['VolumeType'],
volume['State'],
volume['Size'],
volume['Iops'] if 'Iops' in volume else '',
volume['Throughput'] if 'Throughput' in volume else '',
min_iops,
avg_iops,
p95_iops,
max_iops,
min_throughput,
avg_throughput,
p95_throughput,
max_throughput,
is_optimal,
optimal_type,
optimal_iops,
optimal_throughput,
])
# Write region data into CSV file
with open('data.csv', 'a') as csv_file:
writer = csv.writer(csv_file)
writer.writerows(data)
def __cloudwatch_request(self, data, cloudwatch_client, metric_name, volume_id):
response = cloudwatch_client.get_metric_data(
MetricDataQueries=[
{
'Id': 'cloudwatch_metric',
'MetricStat': {
'Metric': {
'Namespace': 'AWS/EBS',
'MetricName': metric_name,
'Dimensions': [
{
'Name': 'VolumeId',
'Value': volume_id
},
]
},
'Period': 300,
'Stat': 'Sum',
'Unit': 'Bytes' if metric_name in ['VolumeReadBytes','VolumeWriteBytes'] else 'Count'
},
'ReturnData': True
}
],
StartTime=self._start_time,
EndTime=self._end_time
)
data[metric_name] = response['MetricDataResults'][0]['Values']
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment