Skip to content

Instantly share code, notes, and snippets.

@lloesche
Created February 15, 2024 17:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lloesche/6710898dbda268b626819d88f754cc9c to your computer and use it in GitHub Desktop.
Save lloesche/6710898dbda268b626819d88f754cc9c to your computer and use it in GitHub Desktop.
import boto3
from datetime import datetime, timedelta
from typing import Dict, List
def cpu_usage_metrics_for_instance(
region: str, instance_id: str, start_time: datetime, duration: timedelta
) -> Dict[str, float]:
end_time = start_time + duration
metric_name = "CPUUtilization"
stats: List[str] = ["Minimum", "Maximum", "Average"]
# Calculate period in seconds, respecting CloudWatch's constraints
total_seconds = int(duration.total_seconds())
if total_seconds <= 3600: # Up to 1 hour
period = max(total_seconds // 60, 60) # Minimum of 60 seconds
elif total_seconds <= 86400: # Up to 24 hours
period = 300 # 5 minutes
else:
period = max(3600, (total_seconds // 86400) * 3600)
# Ensure period is a multiple of 60 seconds
period = max(60, period - (period % 60))
cloudwatch = boto3.client("cloudwatch", region_name=region)
metrics = cloudwatch.get_metric_statistics(
Namespace="AWS/EC2",
MetricName=metric_name,
Dimensions=[{"Name": "InstanceId", "Value": instance_id}],
StartTime=start_time,
EndTime=end_time,
Period=period,
Statistics=stats,
)
# Initialize response structure
metric_stats: Dict[str, Dict[str, float]] = {metric_name: {stat: None for stat in stats}}
# Extract the metrics
if metrics["Datapoints"]:
for stat in stats:
values = [datapoint[stat] for datapoint in metrics["Datapoints"] if stat in datapoint]
if values:
metric_stats[metric_name][stat] = {
"Minimum": round(min(values), 4),
"Maximum": round(max(values), 4),
"Average": round(sum(values) / len(values), 4),
}.get(stat, None)
return metric_stats
def disk_usage_metrics_for_volume(region: str, volume_id: str, start_time: datetime, duration: timedelta):
# Calculate end time based on start_time and duration
end_time = start_time + duration
# Initialize CloudWatch client for the specified region
cloudwatch = boto3.client("cloudwatch", region_name=region)
# Minimum period in seconds (60 seconds = 1 minute)
min_period = 60
# Define metrics to fetch
metrics_info = {
"VolumeReadBytes": {"Unit": "Bytes", "Stats": {}},
"VolumeWriteBytes": {"Unit": "Bytes", "Stats": {}},
"VolumeReadOps": {"Unit": "Count", "Stats": {}},
"VolumeWriteOps": {"Unit": "Count", "Stats": {}},
}
# Fetch and calculate metrics for each
for metric_name, info in metrics_info.items():
response = cloudwatch.get_metric_statistics(
Namespace="AWS/EBS",
MetricName=metric_name,
Dimensions=[{"Name": "VolumeId", "Value": volume_id}],
StartTime=start_time,
EndTime=end_time,
Period=min_period,
Statistics=["Sum"],
Unit=info["Unit"],
)
# Initialize variables for calculations
total = 0
data_points_counter = 0
min_val = float("inf")
max_val = 0
# Process datapoints
for dp in response["Datapoints"]:
val = dp["Sum"]
total += val
data_points_counter += 1
if val < min_val:
min_val = val
if val > max_val:
max_val = val
# Calculate min, max, and avg if data is available
if data_points_counter > 0:
avg_val = total / data_points_counter
# Adjust unit for bytes to MB for throughput metrics
if "Bytes" in metric_name:
conversion_factor = min_period * 1024 * 1024
else: # For IOPS, no conversion needed
conversion_factor = min_period
metrics_info[metric_name]["Stats"]["Minimum"] = round(min_val / conversion_factor, 4)
metrics_info[metric_name]["Stats"]["Maximum"] = round(max_val / conversion_factor, 4)
metrics_info[metric_name]["Stats"]["Average"] = round(avg_val / conversion_factor, 4)
else:
metrics_info[metric_name]["Stats"] = {"Minimum": 0.0, "Maximum": 0.0, "Average": 0.0}
# Organize results for better readability
results = {}
for metric, info in metrics_info.items():
metric_type = "ThroughputMBps" if "Bytes" in metric else "IOPS"
operation = "Read" if "Read" in metric else "Write"
metric_key = f"{operation}{metric_type}"
results[metric_key] = info["Stats"]
return results
def rds_metrics_for_instance(region: str, db_instance_identifier: str, start_time: datetime, duration: timedelta):
end_time = start_time + duration
cloudwatch = boto3.client("cloudwatch", region_name=region)
min_period = 60
metrics = [
"CPUUtilization",
"DatabaseConnections",
"ReadIOPS",
"WriteIOPS",
"ReadLatency",
"WriteLatency",
"FreeStorageSpace",
]
results = {}
for metric_name in metrics:
response = cloudwatch.get_metric_statistics(
Namespace="AWS/RDS",
MetricName=metric_name,
Dimensions=[{"Name": "DBInstanceIdentifier", "Value": db_instance_identifier}],
StartTime=start_time,
EndTime=end_time,
Period=min_period,
Statistics=["Minimum", "Maximum", "Average"],
Unit="Percent" if metric_name == "CPUUtilization" else "Count",
)
min_val, max_val, avg_val = 0.0, 0.0, 0.0
if response["Datapoints"]:
min_val = round(min(dp["Minimum"] for dp in response["Datapoints"]), 4)
max_val = round(max(dp["Maximum"] for dp in response["Datapoints"]), 4)
avg_val = round(sum(dp["Average"] for dp in response["Datapoints"]) / len(response["Datapoints"]), 4)
results[metric_name] = {"Minimum": min_val, "Maximum": max_val, "Average": avg_val}
return results
def ec2_network_metrics_combined(region: str, network_interface_id: str, start_time: datetime, duration: timedelta):
end_time = start_time + duration
cloudwatch = boto3.client("cloudwatch", region_name=region)
metrics = {"Bytes": ["NetworkIn", "NetworkOut"], "Packets": ["NetworkPacketsIn", "NetworkPacketsOut"]}
min_period = 60
results = {}
for metric_type, metric_names in metrics.items():
for metric_name in metric_names:
response = cloudwatch.get_metric_statistics(
Namespace="AWS/EC2",
MetricName=metric_name,
Dimensions=[{"Name": "NetworkInterfaceId", "Value": network_interface_id}],
StartTime=start_time,
EndTime=end_time,
Period=min_period,
Statistics=["Sum"],
)
if response["Datapoints"]:
sums = [dp["Sum"] for dp in response["Datapoints"]]
if metric_type == "Bytes":
converted_sums = [(value * 8 / (1024**2)) for value in sums]
else:
converted_sums = sums
min_val = round(min(converted_sums) / min_period, 4)
max_val = round(max(converted_sums) / min_period, 4)
avg_val = round(sum(converted_sums) / len(converted_sums) / min_period, 4)
else:
min_val = max_val = avg_val = 0.0
unit = "Mbps" if metric_type == "Bytes" else "PPS"
metric_key = f"{metric_name}{unit}"
results[metric_key] = {"Min": min_val, "Max": max_val, "Avg": avg_val}
return results
if __name__ == "__main__":
region = "us-east-2"
instance_id = "i-046ab414885b48fee"
volume_id = "vol-05808092af751c33b"
eni_id = "eni-06d0753e24553733c"
rds_id = "insecurestack-unencryptedrdsinstance-howslpcppcwp"
start_time = datetime.utcnow() - timedelta(hours=1)
duration = timedelta(hours=1)
print(f"{instance_id}: {cpu_usage_metrics_for_instance(region, instance_id, start_time, duration)}")
print(f"{volume_id}: {disk_usage_metrics_for_volume(region, volume_id, start_time, duration)}")
print(f"{rds_id}: {rds_metrics_for_instance(region, rds_id, start_time, duration)}")
print(f"{eni_id}: {ec2_network_metrics_combined(region, eni_id, start_time, duration)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment