Skip to content

Instantly share code, notes, and snippets.

@jesperalmstrom
Forked from rwiggins/s3_bucket_stats.py
Last active September 19, 2019 09:04
Show Gist options
  • Save jesperalmstrom/ac537a54c9d5f686456f8cf3a7699ae7 to your computer and use it in GitHub Desktop.
Save jesperalmstrom/ac537a54c9d5f686456f8cf3a7699ae7 to your computer and use it in GitHub Desktop.
Lists the storage size (for all storage types) and tags of all S3 buckets in an account using CloudWatch's GetMetricStatistics. Uses the default AWS credentials in your environment. (See awscli configuration for more information.). Add your profile as a argument when running.
import boto3
import botocore
import datetime
import progressbar
import itertools
import argparse
# from https://docs.aws.amazon.com/AmazonS3/latest/dev/cloudwatch-monitoring.html
STORAGE_TYPES = [
"StandardStorage",
"IntelligentTieringStorage",
"StandardIAStorage",
"StandardIASizeOverhead",
"StandardIAObjectOverhead",
"OneZoneIAStorage",
"OneZoneIASizeOverhead",
"ReducedRedundancyStorage",
"GlacierStorage",
"GlacierStagingStorage",
"GlacierObjectOverhead",
"GlacierS3ObjectOverhead",
"DeepArchiveStorage",
"DeepArchiveObjectOverhead",
"DeepArchiveS3ObjectOverhead",
"DeepArchiveStagingStorage"
]
now = datetime.datetime.now()
# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("profile", type=str, help="The aws profile to use")
args = parser.parse_args()
# Set your profile name on a low-level Botocore session
boto3.setup_default_session(profile_name=args.profile)
cw = boto3.client('cloudwatch')
s3client = boto3.client('s3')
# Get a list of all buckets
allbuckets = s3client.list_buckets()
# Grab all the bucket tags for pretty display
print('Getting bucket tags...')
tag_bar = progressbar.ProgressBar()
bucket_tags = {}
for bucket in tag_bar(allbuckets['Buckets']):
try:
bucket_tagging = s3client.get_bucket_tagging(Bucket=bucket['Name'])
bucket_tags[bucket['Name']] = bucket_tagging['TagSet']
except botocore.exceptions.ClientError:
bucket_tags[bucket['Name']] = []
# Look up the storage for each storage type from CloudWatch
print('Calculating storage usage...')
bucket_info = []
start_time = now - datetime.timedelta(days=2)
end_time = now - datetime.timedelta(days=1)
main_bar = progressbar.ProgressBar()
bucket_storages = list(itertools.product(allbuckets['Buckets'], STORAGE_TYPES))
for bucket, storage_type in main_bar(bucket_storages):
metrics = cw.get_metric_statistics(
Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName', 'Value': bucket['Name']},
{'Name': 'StorageType', 'Value': storage_type}
],
Statistics=['Average'],
Period=3600,
StartTime=start_time.isoformat(),
EndTime=end_time.isoformat()
)
# The cloudwatch metrics will have the single datapoint, so we just report on it.
for item in metrics["Datapoints"]:
size_bytes = int(item['Average'])
size_gb = size_bytes/1024/1024/1024
bucket_info.append({
'name': '{} ({})'.format(bucket['Name'], storage_type),
'size': size_gb,
'tags': bucket_tags[bucket['Name']]
})
# Pretty looking tags
def format_tags(tags):
return ', '.join([
'{}={}'.format(tag['Key'], tag['Value'])
for tag in tags
])
# Header Line for the output going to standard out
format_string = '{: <75} {: >15} {: <100}'
print(format_string.format('Bucket', 'Size (GiB)', 'Tags'))
for bucket in sorted(bucket_info, key=lambda x: x['size'], reverse=True):
print(format_string.format(
bucket['name'],
'{:.2f}'.format(bucket['size']),
format_tags(bucket['tags'])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment