Created
January 30, 2024 20:29
-
-
Save alukach/732dcc2ce78e8c30c2d51cd88cf594d0 to your computer and use it in GitHub Desktop.
List bytes and objects per storage type for all buckets in an AWS account
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import csv | |
import threading | |
from datetime import datetime, timedelta | |
# List of storage types | |
storage_types = [ | |
"StandardStorage", | |
"IntelligentTieringFAStorage", | |
"IntelligentTieringIAStorage", | |
"StandardIAStorage", | |
"OneZoneIAStorage", | |
"ReducedRedundancyStorage", | |
"GlacierStorage", | |
] | |
# Semaphore to limit concurrency | |
thread_limiter = threading.BoundedSemaphore(value=10) | |
# Time window | |
start_time = datetime.utcnow() - timedelta(days=7) | |
def bytes_to_human_readable(num_bytes): | |
""" | |
Convert bytes to a human-readable format. | |
""" | |
for unit in ["bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]: | |
if abs(num_bytes) < 1024.0: | |
return f"{num_bytes:3.1f} {unit}" | |
num_bytes /= 1024.0 | |
return f"{num_bytes:.1f} YB" | |
def fetch_and_write_metrics(bucket, storage_type, writer_lock, region): | |
with thread_limiter: | |
print(f"{region}/{bucket}/{storage_type}: Fetching...") | |
# Initialize per-region AWS clients | |
session = boto3.Session() | |
cloudwatch = session.client("cloudwatch", region_name=region) | |
# Get bucket size for the storage type | |
size_response = cloudwatch.get_metric_statistics( | |
Namespace="AWS/S3", | |
MetricName="BucketSizeBytes", | |
Dimensions=[ | |
{"Name": "BucketName", "Value": bucket}, | |
{"Name": "StorageType", "Value": storage_type}, | |
], | |
StartTime=start_time, | |
EndTime=datetime.utcnow(), | |
Period=86400, | |
Statistics=["Average"], | |
) | |
size = int( | |
size_response["Datapoints"][0]["Average"] | |
if size_response["Datapoints"] | |
else 0 | |
) | |
# Get object count for the storage type | |
count_response = cloudwatch.get_metric_statistics( | |
Namespace="AWS/S3", | |
MetricName="NumberOfObjects", | |
Dimensions=[ | |
{"Name": "BucketName", "Value": bucket}, | |
{"Name": "StorageType", "Value": storage_type}, | |
], | |
StartTime=start_time, | |
EndTime=datetime.utcnow(), | |
Period=86400, | |
Statistics=["Average"], | |
) | |
object_count = ( | |
count_response["Datapoints"][0]["Average"] | |
if count_response["Datapoints"] | |
else 0 | |
) | |
# Write to CSV if object count or size is not zero | |
if not any([object_count, size]): | |
return print(f"{region}/{bucket}/{storage_type}: Ignored.") | |
row = [ | |
region, | |
bucket, | |
storage_type, | |
size, | |
bytes_to_human_readable(size), | |
object_count, | |
] | |
with writer_lock: | |
writer.writerow(row) | |
print(f"{region}/{bucket}/{storage_type}: Written.") | |
# Get all S3 regions | |
regions = [ | |
region["RegionName"] for region in boto3.client("ec2").describe_regions()["Regions"] | |
] | |
# Prepare to write to CSV | |
with open("s3_metrics.csv", mode="w", newline="") as file: | |
writer = csv.writer(file) | |
writer.writerow( | |
[ | |
"region", | |
"bucket_name", | |
"storage_type", | |
"num_of_bytes", | |
"human_readable_size", | |
"num_of_objects", | |
] | |
) | |
# Lock for synchronizing CSV writer | |
writer_lock = threading.Lock() | |
# Creating threads for each bucket, storage type, and region | |
threads = [] | |
for region in regions: | |
# Initialize per-region S3 client | |
session = boto3.Session() | |
s3 = session.client("s3", region_name=region) | |
# List buckets for the region | |
try: | |
response = s3.list_buckets() | |
buckets = [bucket["Name"] for bucket in response["Buckets"]] | |
for bucket in buckets: | |
for st in storage_types: | |
thread = threading.Thread( | |
target=fetch_and_write_metrics, | |
args=(bucket, st, writer_lock, region), | |
) | |
threads.append(thread) | |
thread.start() | |
except Exception as e: | |
print(f"Error processing region {region}: {e}") | |
# Wait for all threads to complete | |
for thread in threads: | |
thread.join() | |
print("CSV file created successfully.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment