Skip to content

Instantly share code, notes, and snippets.

@Makeshift
Created April 2, 2023 00:06
Show Gist options
  • Save Makeshift/6b0e4606f640d9c026ba6448490fa9b7 to your computer and use it in GitHub Desktop.
Save Makeshift/6b0e4606f640d9c026ba6448490fa9b7 to your computer and use it in GitHub Desktop.
A script that deletes all noncurrent versions of files and delete markers in an S3 bucket
#!/usr/bin/env python3
# This script adapted from https://wasabi-support.zendesk.com/hc/en-us/articles/360058028992-How-do-I-mass-delete-non-current-versions-inside-a-bucket-
# This script is used to delete non-current versions of objects in a bucket. It will not delete the current version of an object.
# I also implemented refreshable sessions from here https://stackoverflow.com/a/69226170
"""
This scripts first argument must be an S3 URI (s3://bucket-name/<optional path>) from where to begin searching for noncurrent versions.
It will enumerate all objects under that prefix and delete all noncurrent versions.
Credentials are gathered automatically from the environment.
Wrap this script with aws-vault if that's how you get your credentials.
aws-vault exec profile -- ./delete_non-current-objects.py s3://bucket-name/path/to/folder
"""
import sys
from boto3 import client
from botocore.exceptions import ClientError
from uuid import uuid4
from datetime import datetime, timezone
from time import time
from boto3 import Session
from botocore.credentials import RefreshableCredentials
from botocore.session import get_session
class RefreshableBotoSession:
"""
Boto Helper class which lets us create refreshable session, so that we can cache the client or resource.
Usage
-----
session = RefreshableBotoSession().refreshable_session()
client = session.client("s3") # we now can cache this client object without worrying about expiring credentials
"""
def __init__(
self,
region_name: str = None,
profile_name: str = None,
sts_arn: str = None,
session_name: str = None,
session_ttl: int = 3000
):
"""
Initialize `RefreshableBotoSession`
Parameters
----------
region_name : str (optional)
Default region when creating new connection.
profile_name : str (optional)
The name of a profile to use.
sts_arn : str (optional)
The role arn to sts before creating session.
session_name : str (optional)
An identifier for the assumed role session. (required when `sts_arn` is given)
session_ttl : int (optional)
An integer number to set the TTL for each session. Beyond this session, it will renew the token.
50 minutes by default which is before the default role expiration of 1 hour
"""
self.region_name = region_name
self.profile_name = profile_name
self.sts_arn = sts_arn
self.session_name = session_name or uuid4().hex
self.session_ttl = session_ttl
def __get_session_credentials(self):
"""
Get session credentials
"""
session = Session(region_name=self.region_name,
profile_name=self.profile_name)
# if sts_arn is given, get credential by assuming given role
if self.sts_arn:
sts_client = session.client(
service_name="sts", region_name=self.region_name)
response = sts_client.assume_role(
RoleArn=self.sts_arn,
RoleSessionName=self.session_name,
DurationSeconds=self.session_ttl,
).get("Credentials")
credentials = {
"access_key": response.get("AccessKeyId"),
"secret_key": response.get("SecretAccessKey"),
"token": response.get("SessionToken"),
"expiry_time": response.get("Expiration").isoformat(),
}
else:
session_credentials = session.get_credentials().__dict__
credentials = {
"access_key": session_credentials.get("access_key"),
"secret_key": session_credentials.get("secret_key"),
"token": session_credentials.get("token"),
"expiry_time": datetime.fromtimestamp(time() + self.session_ttl).replace(tzinfo=timezone.utc).isoformat(),
}
return credentials
def refreshable_session(self) -> Session:
"""
Get refreshable boto3 session.
"""
# get refreshable credentials
refreshable_credentials = RefreshableCredentials.create_from_metadata(
metadata=self.__get_session_credentials(),
refresh_using=self.__get_session_credentials,
method="sts-assume-role",
)
# attach refreshable credentials current session
session = get_session()
session._credentials = refreshable_credentials
session.set_config_variable("region", self.region_name)
autorefresh_session = Session(botocore_session=session)
return autorefresh_session
def calculate_size(size, _size_table):
"""
This function dynamically calculates the right base unit symbol for size of the object.
:param size: size in integer to be dynamically calculated.
:param _size_table: dictionary of size in Bytes
:return: string of converted size.
"""
count = 0
while size // 1024 > 0:
size = size / 1024
count += 1
return str(round(size, 2)) + ' ' + _size_table[count]
def create_connection_and_test(_bucket):
"""
Checks if the credentials are valid and if the bucket exists.
NOTE: creating the connection is not enough to test. We need to make a method call to check for its working status.
:param _bucket: bucket name string
:return: reference to the connection client
"""
try:
session = RefreshableBotoSession().refreshable_session()
_s3_client = session.client('s3')
# Test credentials are working
_s3_client.list_buckets()
try:
_s3_client.head_bucket(Bucket=bucket)
except ClientError:
# The bucket does not exist or you have no access.
raise Exception(
"$ bucket does not exist in the account please re-check the name and try again: ")
return _s3_client
except ClientError:
print("Invalid Access and Secret keys")
except Exception as e:
raise e
# cannot reach here
return None
if __name__ == '__main__':
# Generate a table for SI units symbol table.
size_table = {0: 'Bs', 1: 'KBs', 2: 'MBs',
3: 'GBs', 4: 'TBs', 5: 'PBs', 6: 'EBs'}
print("\n")
print("\n")
print("$ starting script...")
# Assumes the input is an s3 URI following the format s3://<bucket-name>/<prefix>
s3_uri = sys.argv[1].strip()
bucket = s3_uri.split('/')[2]
prefix = '/'.join(s3_uri.split('/')[3:])
# test the connection and access keys. Also checks if the bucket is valid.
s3_client = create_connection_and_test(bucket)
# create a paginator with default settings.
object_response_paginator = s3_client.get_paginator('list_object_versions')
if len(prefix) > 0:
operation_parameters = {'Bucket': bucket,
'Prefix': prefix}
else:
operation_parameters = {'Bucket': bucket}
# initialize basic variables for in memory storage.
delete_marker_count = 0
delete_marker_size = 0
versioned_object_count = 0
versioned_object_size = 0
current_object_count = 0
current_object_size = 0
delete_marker_list = []
version_list = []
print("$ Calculating, please wait... this may take a while")
for object_response_itr in object_response_paginator.paginate(**operation_parameters):
if 'DeleteMarkers' in object_response_itr:
for delete_marker in object_response_itr['DeleteMarkers']:
delete_marker_list.append(
{'Key': delete_marker['Key'], 'VersionId': delete_marker['VersionId']})
delete_marker_count += 1
if 'Versions' in object_response_itr:
for version in object_response_itr['Versions']:
if version['IsLatest'] is False:
versioned_object_count += 1
versioned_object_size += version['Size']
version_list.append(
{'Key': version['Key'], 'VersionId': version['VersionId']})
elif version['IsLatest'] is True:
current_object_count += 1
current_object_size += version['Size']
total_count = delete_marker_count + versioned_object_count + current_object_count
print(f'{total_count}\t', end='', flush=True)
print("\n")
print("-" * 10)
print("$ Total Delete markers: " + str(delete_marker_count))
print("$ Number of Current objects: " + str(current_object_count))
print("$ Current Objects size: ", calculate_size(
current_object_size, size_table))
print("$ Number of Non-current objects: " + str(versioned_object_count))
print("$ Non-current Objects size: ",
calculate_size(versioned_object_size, size_table))
print("$ Total size of current + non current objects: ",
calculate_size(versioned_object_size + current_object_size, size_table))
print("-" * 10)
print("\n")
delete_flag = False
while not delete_flag:
choice = input(
"$ Do you wish to delete the delete markers and non-current objects? [y/n] ")
if choice.strip().lower() == 'y':
delete_flag = True
print("$ starting deletes now...")
print("$ removing delete markers 1000 at a time")
for i in range(0, len(delete_marker_list), 1000):
response = s3_client.delete_objects(
Bucket=bucket,
Delete={
'Objects': delete_marker_list[i:i + 1000],
'Quiet': True
}
)
print(response)
print("$ removing old versioned objects 1000 at a time")
for i in range(0, len(version_list), 1000):
response = s3_client.delete_objects(
Bucket=bucket,
Delete={
'Objects': version_list[i:i + 1000],
'Quiet': True
}
)
print(response)
else:
print("$ aight then.")
print("$ All done.")
print("\n")
print("\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment