Skip to content

Instantly share code, notes, and snippets.

@jspalink
Created April 20, 2020 17:01
Show Gist options
  • Save jspalink/2df40d055284deff9b437b78930fd3f6 to your computer and use it in GitHub Desktop.
Save jspalink/2df40d055284deff9b437b78930fd3f6 to your computer and use it in GitHub Desktop.
Provide a bit better insight into storage costs associated with S3 buckets.
import boto3
import datetime
import json
from collections import namedtuple
import tempfile
import time
import statistics
import argparse
import os
import pickle
s3_client = boto3.client('s3')
cloudwatch_client = boto3.client('cloudwatch')
price_list = dict()
now = datetime.datetime.now()
s3bucket = namedtuple('s3bucket', ('name', 'tags', 'versioning', 'standard_size_start', 'standard_size_end', 'standard_size_avg', 'glacier_size_start', 'glacier_size_end', 'glacier_size_avg'))
tmpdir = tempfile.gettempdir()
aws_region_map = {
'ca-central-1': 'Canada (Central)',
'ap-northeast-3': 'Asia Pacific (Osaka-Local)',
'us-east-1': 'US East (N. Virginia)',
'ap-northeast-2': 'Asia Pacific (Seoul)',
'us-gov-west-1': 'AWS GovCloud (US)',
'us-east-2': 'US East (Ohio)',
'ap-northeast-1': 'Asia Pacific (Tokyo)',
'ap-south-1': 'Asia Pacific (Mumbai)',
'ap-southeast-2': 'Asia Pacific (Sydney)',
'ap-southeast-1': 'Asia Pacific (Singapore)',
'sa-east-1': 'South America (Sao Paulo)',
'us-west-2': 'US West (Oregon)',
'eu-west-1': 'EU (Ireland)',
'eu-west-3': 'EU (Paris)',
'eu-west-2': 'EU (London)',
'us-west-1': 'US West (N. California)',
'eu-central-1': 'EU (Frankfurt)'
}
def build_pricing_defaults(pricing_client=None, region='us-east-1', *args, **kwargs):
pricing_client = pricing_client or boto3.client('pricing', region_name=region)
s3_pricing = get_s3_pricing(region=region)
file_path = build_pricing_path('aws_s3_prices', region)
if not pricing_file_is_good(file_path):
with open(file_path, 'wb') as f:
pickle.dump(s3_pricing, f)
return
def build_pricing_path(n, region='us-east-1'):
return os.path.abspath(os.path.join(tmpdir, '{}-{}'.format(n, region)))
def pricing_file_is_good(file_path, ttl=604800):
return os.path.exists(file_path) and os.path.getctime(file_path) > (time.time() - 604800)
def get_existing(file_path, ttl=604800):
if pricing_file_is_good(file_path, ttl):
return pickle.load(open(file_path, 'rb'))
def get_price_list(pricing_client, volume_type='Standard', *args, **kwargs):
price_list = []
response = pricing_client.get_products(
ServiceCode='AmazonS3',
Filters=[
{'Type':'TERM_MATCH', 'Field': 'location', 'Value': 'US East (N. Virginia)'},
{'Type':'TERM_MATCH', 'Field':'productFamily', 'Value':'Storage'},
{'Type':'TERM_MATCH', 'Field':'volumeType', 'Value':volume_type}
]
)
for result in response['PriceList']:
json_result = json.loads(result)
for on_demand in json_result['terms']['OnDemand'].values():
for price_dimensions in on_demand['priceDimensions'].values():
begin = int(price_dimensions['beginRange'])
end = None
if price_dimensions['endRange'].isdecimal():
end = int(price_dimensions['endRange'])
price_list.append((begin, end, float(price_dimensions['pricePerUnit']['USD'])))
return sorted(price_list, key=lambda x: x[0])
def get_s3_pricing(pricing_client=None, region='us-east-1', *args, **kwargs):
"""
Returns a pricing dictionary for S3 pricing for this region
"""
price_dictionary = get_existing(build_pricing_path('aws_s3_prices', region))
if price_dictionary:
return price_dictionary
if not pricing_client:
pricing_client = boto3.client('pricing', region_name=region)
resolved_region = aws_region_map.get(region)
price_dictionary = dict()
price_dictionary['glacier'] = get_price_list(pricing_client, volume_type='Amazon Glacier')
price_dictionary['standard'] = get_price_list(pricing_client, volume_type='Standard')
return price_dictionary
def get_bucket_datapoints(cloudwatch_client, bucket_name, datapoint='StandardStorage', statistic='Average'):
response = cloudwatch_client.get_metric_statistics(Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[{'Name': 'BucketName', 'Value': bucket_name},{'Name': 'StorageType', 'Value': datapoint}],
Statistics=[statistic],
Period=86400,
StartTime=(now-datetime.timedelta(days=30)).isoformat(),
EndTime=now.isoformat()
)
return response['Datapoints']
def get_bucket_start_end_avg(datapoints, statistic='Average'):
if not datapoints:
return 0, 0, 0
sorted_datapoints = sorted(datapoints, key=lambda x: x['Timestamp'])
return sorted_datapoints[0][statistic], sorted_datapoints[-1][statistic], statistics.mean((s[statistic] for s in sorted_datapoints))
def get_bucket_tags(s3_client, bucket_name):
try:
tags = s3_client.get_bucket_tagging(Bucket=bucket_name)
tags = {t['Key']:t['Value'] for t in tags['TagSet']}
except:
tags = dict()
return tags
def get_bucket_tag(bucket, tag_name):
return bucket.tags.get(tag_name, '')
def get_bucket_versioning(s3_client, bucket_name):
status = ''
try:
status = s3_client.get_bucket_versioning(Bucket=bucket_name)['Status'] == 'Enabled'
except:
pass
return status
def get_bucket_info(s3_client, cloudwatch_client, bucket_name, bucket=None, *args, **kwargs):
standard_storage_datapoints = get_bucket_datapoints(cloudwatch_client, bucket_name, 'StandardStorage')
glacier_storage_datapoints = get_bucket_datapoints(cloudwatch_client, bucket_name, 'GlacierStorage')
standard_start, standard_end, standard_avg = get_bucket_start_end_avg(standard_storage_datapoints)
glacier_start, glacier_end, glacier_avg = get_bucket_start_end_avg(glacier_storage_datapoints)
tags = get_bucket_tags(s3_client, bucket_name)
versioning = get_bucket_versioning(s3_client, bucket_name)
return s3bucket(bucket_name, tags, versioning, standard_start, standard_end, standard_avg, glacier_start, glacier_end, glacier_avg)
def yield_buckets_info(s3_client, cloudwatch_client):
for bucket in s3_client.list_buckets()['Buckets']:
yield get_bucket_info(s3_client, cloudwatch_client, bucket['Name'], bucket)
def bytes_to_gb(bytes):
return bytes/1024/1024/1024
def find_storage_cost(price_list, storage_class, storage_bytes):
storage_gb = bytes_to_gb(storage_bytes)
cost = [0]
for tier in price_list[storage_class]:
if storage_gb < tier[0]:
# if our storage amount is less than available in this tier, stop
break
charging_gb = storage_gb
if tier[1] and tier[1] < storage_gb:
# if we have more GB that in this tier...
charging_gb = tier[1]
if tier[0] > 0:
# ignore what we've already charged for
charging_gb = charging_gb - tier[0]
cost.append(charging_gb * tier[2])
return sum(cost)
def get_growth_factor(start, end):
result = 0
try:
result = (end-start)/start
except:
pass
return result
def build_report(s3_client, cloudwatch_client, pricing=None):
price_list = pricing or price_list
print("{:50} {:13} {:13} {:9} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12} {:>12}".format(
"bucket", # 50
"division", # 13
"owner", # 13
"versioned", # 9
"avg size", # 12
"avg growth", # 12
"glacier size", # 12
"glcr growth", # 12
"std cost", # 12
"glacier cost", # 12
"total cost" # 12
))
for bucket in yield_buckets_info(s3_client, cloudwatch_client):
# ('name', 'tags', 'standard_size_start', 'standard_size_end', 'standard_size_avg', 'glacier_size_start', 'glacier_size_end', 'glacier_size_avg')
std_cost = find_storage_cost(price_list, 'standard', bucket.standard_size_avg)
glr_cost = find_storage_cost(price_list, 'glacier', bucket.glacier_size_avg)
print("{:50} {:13} {:13} {:9} {:>10.1f}Gb {:>12.2%} {:>10.1f}Gb {:>12.2%} {:>12.2f} {:>12.2f} {:>12.2f}".format(
bucket.name, # {:50}
get_bucket_tag(bucket, 'division'), # {:13}
get_bucket_tag(bucket, 'owner'), # {:13}
str(bucket.versioning).lower(), # {:9}
bytes_to_gb(bucket.standard_size_avg), # {:>9.1f} Gb
get_growth_factor(bucket.standard_size_start, bucket.standard_size_end), # {:9.2f}%
bytes_to_gb(bucket.glacier_size_avg), # {:>9.1f} Gb
get_growth_factor(bucket.glacier_size_start, bucket.glacier_size_end), # {:9.2f}%
std_cost, # {:>12.2f}
glr_cost, # {:>12.2f}
std_cost + glr_cost # {:>12.2f}
))
def main(*args, **kwargs):
pricing_client = boto3.client('pricing')
build_pricing_defaults(pricing_client=pricing_client)
price_list = get_s3_pricing(pricing_client=pricing_client)
return build_report(s3_client, cloudwatch_client, price_list)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='List S3 Buckets and associated cost')
parser.add_argument("-t", "--tempdir", dest="temp", default=tempfile.gettempdir(), help="Temp directory to store pricing lists")
options = parser.parse_args()
tmpdir = options.temp
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment