Skip to content

Instantly share code, notes, and snippets.

@zonque
Last active November 26, 2023 20:31
Show Gist options
  • Save zonque/6e54cadc990b3aa911b364565ae41039 to your computer and use it in GitHub Desktop.
Save zonque/6e54cadc990b3aa911b364565ae41039 to your computer and use it in GitHub Desktop.
Remove objects from s3
import boto3
import concurrent
import threading
import os
import magic
import pathlib
region_name = "eu-central-1"
bucket_name = ""
access_key_id = ""
secret_access_key = ""
dry_run = False
num_threads = 10
key_prefix = "upload/"
cache_dir = "cache/"
max_size = 1024 * 1024 * 100
allowed_types = [
"application/epub+zip",
"application/gzip",
"application/json",
"application/msword",
"application/ogg",
"application/pdfx",
"application/rtf",
"application/vnd.ms-excel",
"application/vnd.ms-powerpoint",
"application/vnd.oasis.opendocument.text",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.rar",
"application/x-7z-compressed",
"application/x-bzip",
"application/x-bzip2",
"application/x-tar",
"application/xhtml+xml",
"application/xml",
"application/zip",
"audio/aac",
"audio/mpeg",
"audio/ogg",
"audio/wav",
"audio/webm",
"image/bmp",
"image/gif",
"image/jpeg",
"image/png",
"image/svg+xml",
"image/tiff",
"image/vnd.microsoft.icon",
"image/webp",
"text/csv",
"text/html",
"text/plain",
"text/xml",
"video/mp4",
"video/mpeg",
"video/ogg",
"video/x-msvideo",
]
session = boto3.Session(
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
)
s3 = session.resource('s3',
region_name = region_name,
)
bucket = s3.Bucket(bucket_name)
delete_objects = []
will_free_bytes = 0
count = 0
print("Gathering objects")
keys = [f.key for f in bucket.objects.all()]
total = len(keys)
print(f"Got {total} keys in bucket")
lock = threading.Lock()
def check_object(key):
obj = s3.Object(bucket_name, key).get()
response = obj['ResponseMetadata']
headers = response['HTTPHeaders']
content_type = headers['content-type'].split(";")[0]
content_len = int(headers['content-length'])
global count
global total
cache_file = cache_dir + key
percent = 0
n = 0
with lock:
n = count
count += 1
percent = (n*100)/total
if os.path.isfile(cache_file):
print(f"Skipping {key} (cached) ({n}/{total}, {percent:,.1f}%) ...")
return
if content_type == "application/x-directory":
return
if key_prefix != "" and not key.startswith(key_prefix):
return
print(f"Checking {key}, size {content_len} ({n}/{total}, {percent:,.1f}%) ...")
delete = False
if content_len > max_size:
print(f"{key} is too large ({content_len} bytes), removing")
delete = True
else:
body = obj['Body']
data = body.read(2048)
content_type = magic.from_buffer(data, mime=True)
if not content_type in allowed_types:
print(f"{key} ({content_len} bytes) has content type {content_type}, removing")
delete = True
# mark the file as checked
dir_name = os.path.dirname(cache_file)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
pathlib.Path(cache_file).touch()
if delete:
with lock:
global delete_objects
global will_free_bytes
delete_objects.append({ 'Key': key })
will_free_bytes += content_len
if num_threads > 1:
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
for key in keys:
executor.submit(check_object, key)
else:
for key in keys:
check_object(key)
if dry_run:
print(f"Dry run, not purging {len(delete_objects)} objects ({will_free_bytes} bytes)")
elif len(delete_objects) == 0:
print(f"No objects marked for deletion")
else:
print(f"Purging {len(delete_objects)} objects (will free {will_free_bytes} bytes)...")
bucket.delete_objects(
Delete={
'Objects': delete_objects,
'Quiet': True
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment