Skip to content

Instantly share code, notes, and snippets.

@alukach
Created January 30, 2024 20:29
Show Gist options
  • Save alukach/732dcc2ce78e8c30c2d51cd88cf594d0 to your computer and use it in GitHub Desktop.
Save alukach/732dcc2ce78e8c30c2d51cd88cf594d0 to your computer and use it in GitHub Desktop.
List bytes and objects per storage type for all buckets in an AWS account
import boto3
import csv
import threading
from datetime import datetime, timedelta
# List of storage types
storage_types = [
"StandardStorage",
"IntelligentTieringFAStorage",
"IntelligentTieringIAStorage",
"StandardIAStorage",
"OneZoneIAStorage",
"ReducedRedundancyStorage",
"GlacierStorage",
]
# Semaphore to limit concurrency
thread_limiter = threading.BoundedSemaphore(value=10)
# Time window
start_time = datetime.utcnow() - timedelta(days=7)
def bytes_to_human_readable(num_bytes):
"""
Convert bytes to a human-readable format.
"""
for unit in ["bytes", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]:
if abs(num_bytes) < 1024.0:
return f"{num_bytes:3.1f} {unit}"
num_bytes /= 1024.0
return f"{num_bytes:.1f} YB"
def fetch_and_write_metrics(bucket, storage_type, writer_lock, region):
with thread_limiter:
print(f"{region}/{bucket}/{storage_type}: Fetching...")
# Initialize per-region AWS clients
session = boto3.Session()
cloudwatch = session.client("cloudwatch", region_name=region)
# Get bucket size for the storage type
size_response = cloudwatch.get_metric_statistics(
Namespace="AWS/S3",
MetricName="BucketSizeBytes",
Dimensions=[
{"Name": "BucketName", "Value": bucket},
{"Name": "StorageType", "Value": storage_type},
],
StartTime=start_time,
EndTime=datetime.utcnow(),
Period=86400,
Statistics=["Average"],
)
size = int(
size_response["Datapoints"][0]["Average"]
if size_response["Datapoints"]
else 0
)
# Get object count for the storage type
count_response = cloudwatch.get_metric_statistics(
Namespace="AWS/S3",
MetricName="NumberOfObjects",
Dimensions=[
{"Name": "BucketName", "Value": bucket},
{"Name": "StorageType", "Value": storage_type},
],
StartTime=start_time,
EndTime=datetime.utcnow(),
Period=86400,
Statistics=["Average"],
)
object_count = (
count_response["Datapoints"][0]["Average"]
if count_response["Datapoints"]
else 0
)
# Write to CSV if object count or size is not zero
if not any([object_count, size]):
return print(f"{region}/{bucket}/{storage_type}: Ignored.")
row = [
region,
bucket,
storage_type,
size,
bytes_to_human_readable(size),
object_count,
]
with writer_lock:
writer.writerow(row)
print(f"{region}/{bucket}/{storage_type}: Written.")
# Get all S3 regions
regions = [
region["RegionName"] for region in boto3.client("ec2").describe_regions()["Regions"]
]
# Prepare to write to CSV
with open("s3_metrics.csv", mode="w", newline="") as file:
writer = csv.writer(file)
writer.writerow(
[
"region",
"bucket_name",
"storage_type",
"num_of_bytes",
"human_readable_size",
"num_of_objects",
]
)
# Lock for synchronizing CSV writer
writer_lock = threading.Lock()
# Creating threads for each bucket, storage type, and region
threads = []
for region in regions:
# Initialize per-region S3 client
session = boto3.Session()
s3 = session.client("s3", region_name=region)
# List buckets for the region
try:
response = s3.list_buckets()
buckets = [bucket["Name"] for bucket in response["Buckets"]]
for bucket in buckets:
for st in storage_types:
thread = threading.Thread(
target=fetch_and_write_metrics,
args=(bucket, st, writer_lock, region),
)
threads.append(thread)
thread.start()
except Exception as e:
print(f"Error processing region {region}: {e}")
# Wait for all threads to complete
for thread in threads:
thread.join()
print("CSV file created successfully.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment