Skip to content

Instantly share code, notes, and snippets.

@dinos80152
Created November 19, 2020 10:27
Show Gist options
  • Save dinos80152/7b10e74c93fac70d917520184b61d89a to your computer and use it in GitHub Desktop.
Save dinos80152/7b10e74c93fac70d917520184b61d89a to your computer and use it in GitHub Desktop.
Generate S3 Usage Report
# check unused s3 bucket by compare min and max number of objects
import boto3
from datetime import datetime
import csv
START_TIME = datetime(2019, 3, 1)
END_TIME = datetime(2019, 6, 1)
def get_bucket_names():
s3 = boto3.client('s3')
response = s3.list_buckets()
names = []
for bucket in response["Buckets"]:
names.append(bucket["Name"])
return names
def get_bucket_location(bucket_name):
s3 = boto3.client('s3')
response = s3.get_bucket_location(Bucket=bucket_name)
return response["LocationConstraint"]
def get_bucket_min_max_objects(bucket_name, region, start_time, end_time):
client = boto3.client('cloudwatch', region_name=region)
response = client.get_metric_statistics(
Namespace="AWS/S3",
MetricName="NumberOfObjects",
Dimensions=[
{
'Name': 'StorageType',
'Value': 'AllStorageTypes',
},
{
'Name': 'BucketName',
'Value': bucket_name,
}
],
StartTime=start_time,
EndTime=end_time,
Period=int((end_time-start_time).total_seconds()),
Statistics=['Minimum', 'Maximum']
)
if len(response["Datapoints"]) == 0:
return -1, -1
return response["Datapoints"][0]['Minimum'], response["Datapoints"][0]['Maximum']
def get_bucket_max_size(bucket_name, region, start_time, end_time):
client = boto3.client('cloudwatch', region_name=region)
response = client.get_metric_statistics(
Namespace="AWS/S3",
MetricName="BucketSizeBytes",
Dimensions=[
{
'Name': 'StorageType',
'Value': 'StandardStorage',
},
{
'Name': 'BucketName',
'Value': bucket_name,
}
],
StartTime=start_time,
EndTime=end_time,
Period=int((end_time-start_time).total_seconds()),
Statistics=['Maximum']
)
if len(response["Datapoints"]) == 0:
return 0
return response["Datapoints"][0]['Maximum']
def check_unused(min_objs, max_objs):
if min_objs == max_objs:
return True
return False
def get_bytes_with_unit(size):
units = ["bytes", "K", "M", "G", "T"]
for i in range(1, 6):
size_tmp = size / 1000
if size_tmp < 1:
return str(size)+units[i-1]
size = size_tmp
if __name__ == "__main__":
bucket_names = get_bucket_names()
start_time, end_time = START_TIME, END_TIME
bucket_dict = {}
for name in bucket_names:
region = get_bucket_location(name)
min_objs, max_objs = get_bucket_min_max_objects(
name, region, start_time, end_time)
size = get_bucket_max_size(name, region, start_time, end_time)
bucket_dict.setdefault(name, {
"region": region,
"size": size,
"objects": max_objs,
"unused": "v" if check_unused(min_objs, max_objs) else ""
})
print(bucket_dict)
with open('report.csv', 'w') as csvfile:
writer = csv.writer(csvfile, delimiter=',')
writer.writerow(["bucket name", "region", "objects", "size", "unused"])
for key, value in sorted(bucket_dict.items(), key=lambda (k, v): v["size"], reverse=True):
writer.writerow([key, value["region"], value["objects"], get_bytes_with_unit(
value["size"]), value["unused"]])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment