Skip to content

Instantly share code, notes, and snippets.

@juancresc
Last active June 29, 2022 14:25
Show Gist options
  • Save juancresc/515104d0c538066887d721dd3e099448 to your computer and use it in GitHub Desktop.
Save juancresc/515104d0c538066887d721dd3e099448 to your computer and use it in GitHub Desktop.
Get buckets size, cost and location
import boto3
import datetime
from hurry.filesize import size
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
location_search = 'us-west-2'
#location_search = 'us-east-1'
s3_client = boto3.client('s3')
ce_client = boto3.client('ce')
def get_location(bucket_name):
location = s3_client.get_bucket_location(Bucket=bucket_name)
location_str = location.get('LocationConstraint')
if location_str == None:
return 'us-east-1'
return location_str
def get_size(bucket_name,location, storage_type='StandardStorage'):
cw = boto3.client('cloudwatch', region_name=location)
response = cw.get_metric_statistics(Namespace='AWS/S3',
MetricName='BucketSizeBytes',
Dimensions=[
{'Name': 'BucketName',
'Value': bucket_name},
{'Name': 'StorageType',
'Value': storage_type}
],
Statistics=['Average'],
Period=86400,
StartTime=(
datetime.now()-timedelta(days=3)).isoformat(),
EndTime=datetime.now().isoformat()
)
if len(response["Datapoints"]) == 0:
return 0
return response["Datapoints"][-1]["Average"]
def get_cost(tag_name, tag_cost):
if not tag_name or not tag_cost:
return 0
response = ce_client.get_cost_and_usage(
TimePeriod={
'Start': (datetime.now()- relativedelta(months=3)).strftime("%Y-%m-%d"),
'End': datetime.now().strftime("%Y-%m-%d"),
},
Granularity='MONTHLY',
Metrics=['BlendedCost'],
# GroupBy= [
# {
# "Key": "SERVICE",
# "Type": "DIMENSION"
# }
# ],
Filter={
'And':[
{
'Dimensions': {
'Key': 'SERVICE',
'MatchOptions': ['EQUALS'],
'Values': ['Amazon Simple Storage Service'],
}
},
{
'Tags': {
'Key': 'Name',
'Values': [tag_name],
}
},
{
'Tags': {
'Key': 'Cost Center',
'Values': [tag_cost],
}
}
],
},
)
total = 0
count = 0
#print(response)
for ti in response['ResultsByTime']:
total += float(ti['Total']['BlendedCost']['Amount'])
count += 1
total = total / count
return round(total, 2)
def get_tag(bucket_name):
tag_cost = False
tag_name = False
try:
tag_response = s3_client.get_bucket_tagging(Bucket=bucket_name)
except ClientError:
return tag_cost, tag_name
tagset = tag_response['TagSet']
for tag in tagset:
if tag['Key'] == 'Name':
tag_name = tag['Value']
if tag['Key'] == 'Cost Center':
tag_cost = tag['Value']
return tag_cost, tag_name
def main():
buckets = s3_client.list_buckets()
print(f"bucket_name,location,tag_cost,tag_name,avg_cost,std_size,std_size_bytes,gla_size,gla_size_bytes")
for bucket in buckets['Buckets']:
bucket_name = bucket['Name']
location = get_location(bucket_name)
tag_cost, tag_name = get_tag(bucket_name)
total_cost = get_cost(tag_name, tag_cost)
bucket_size = get_size(bucket_name, location)
bucket_size_gla = get_size(bucket_name, location, storage_type='DeepArchiveStorage')
print(f"{bucket_name},{location},{tag_cost},{tag_name},{total_cost},{size(bucket_size)},{bucket_size},{size(bucket_size_gla)},{bucket_size_gla}")
main()
#bucket_size = get_size("dsp-rtbproducts-use1-dev", 'us-east-1')
#print(bucket_size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment