Skip to content

Instantly share code, notes, and snippets.

@Exagone313
Created July 31, 2023 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Exagone313/213e05462c4b6e59b33aa10f7dfaa8bf to your computer and use it in GitHub Desktop.
Save Exagone313/213e05462c4b6e59b33aa10f7dfaa8bf to your computer and use it in GitHub Desktop.
AWS S3 deletion

AWS S3 deletion

Do not use in production

import os
import boto3
def main():
aws_profile = os.getenv("AWS_PROFILE")
if not aws_profile:
raise ValueError("Missing variable AWS_PROFILE")
bucket_name = os.getenv("S3_BUCKET")
if not bucket_name:
raise ValueError("Missing variable S3_BUCKET")
session = boto3.session.Session(profile_name=aws_profile)
s3_resource = session.resource("s3")
if not s3_resource.Bucket(bucket_name).creation_date:
raise ValueError(f"Unknown bucket {bucket_name}")
markers = []
s3_client = session.client("s3")
s3_paginator = s3_client.get_paginator("list_object_versions")
s3_pages = s3_paginator.paginate(Bucket=bucket_name)
for s3_page in s3_pages:
if "DeleteMarkers" in s3_page:
print(f"Found {len(s3_page['DeleteMarkers'])} delete markers")
for s3_delete_marker in s3_page["DeleteMarkers"]:
markers.append({
"Key": s3_delete_marker["Key"],
"VersionId": s3_delete_marker["VersionId"]
})
if len(markers) > 999:
print(f"Will delete {len(markers)} markers")
s3_client.delete_objects(
Bucket=bucket_name,
Delete={
"Objects": markers,
},
)
markers = []
if len(markers) > 1:
print(f"Will delete {len(markers)} markers")
s3_client.delete_objects(
Bucket=bucket_name,
Delete={
"Objects": markers,
},
)
if __name__ == "__main__":
main()
import os
from datetime import datetime, timedelta, timezone
import boto3
def main():
aws_profile = os.getenv("AWS_PROFILE")
if not aws_profile:
raise ValueError("Missing variable AWS_PROFILE")
bucket_name = os.getenv("S3_BUCKET")
if not bucket_name:
raise ValueError("Missing variable S3_BUCKET")
days = os.getenv("DAYS")
if not days:
raise ValueError("Missing variable DAYS")
days = int(days)
assert days > 0
date_threshold = datetime.now(timezone.utc) - timedelta(days=days)
session = boto3.session.Session(profile_name=aws_profile)
s3_resource = session.resource("s3")
if not s3_resource.Bucket(bucket_name).creation_date:
raise ValueError(f"Unknown bucket {bucket_name}")
keys = []
s3_client = session.client("s3")
s3_paginator = s3_client.get_paginator("list_objects_v2")
s3_pages = s3_paginator.paginate(Bucket=bucket_name)
for s3_page in s3_pages:
if "Contents" in s3_page:
print(f"Found {len(s3_page['Contents'])} objects")
for key_object in s3_page["Contents"]:
if key_object["LastModified"] < date_threshold:
keys.append({
"Key": key_object["Key"],
})
if len(keys) > 999:
print(f"Will delete {len(keys)} objects")
s3_client.delete_objects(
Bucket=bucket_name,
Delete={
"Objects": keys,
},
)
keys = []
if len(keys) > 1:
print(f"Will delete {len(keys)} keys")
s3_client.delete_objects(
Bucket=bucket_name,
Delete={
"Objects": keys,
},
)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment