Skip to content

Instantly share code, notes, and snippets.

@imrehg
Last active February 1, 2021 16:03
Show Gist options
  • Save imrehg/3190eba089cf4fbeba657f62e580a70e to your computer and use it in GitHub Desktop.
Save imrehg/3190eba089cf4fbeba657f62e580a70e to your computer and use it in GitHub Desktop.
Batch delete files from a versioned S3 bucket
import argparse
import math
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import boto3
from tqdm.auto import tqdm
client = boto3.client("s3")
objects = []
parser = argparse.ArgumentParser(description="Delete files from S3")
parser.add_argument("--bucket", required=True)
parser.add_argument("--infile")
parser.add_argument("--prefix")
parser.add_argument("--versionsfile")
parser.add_argument("--delete", dest="delete", action="store_true")
parser.add_argument("--workers", type=int, default=5)
parser.set_defaults(delete=False)
args = parser.parse_args()
batch = 1000
if len(list(filter(None, [args.infile, args.prefix, args.versionsfile]))) > 1:
sys.exit(
"Stopping: please set only one of --infile, --prefix, or --versionsfile!"
)
def get_versions(client, bucket, key, progress=True):
results = []
pbar = tqdm(desc="Gathering file list (requests)") if progress else None
response = client.list_object_versions(
Bucket=args.bucket, Prefix=key, MaxKeys=batch
)
while True:
if pbar is not None:
pbar.update(1)
# Process response
if "Versions" in response:
versions = response["Versions"]
for version in versions:
results += [
{"Key": version["Key"], "VersionId": version["VersionId"]}
]
if "DeleteMarkers" in response:
markers = response["DeleteMarkers"]
for marker in markers:
results += [
{"Key": marker["Key"], "VersionId": marker["VersionId"]}
]
# Check if the results got paginated:
if response["IsTruncated"]:
response = client.list_object_versions(
Bucket=args.bucket,
Prefix=key,
KeyMarker=response["NextKeyMarker"],
VersionIdMarker=response["NextVersionIdMarker"],
MaxKeys=1000,
)
else:
break
if pbar is not None:
pbar.close()
return results
if args.infile:
with ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = dict()
with open(args.infile, "r") as input_list:
for line in tqdm(input_list.readlines()):
key = line.strip()
futures[
executor.submit(
get_versions, client, args.bucket, key, False
)
] = key
with open(args.infile + ".list", "w") as output_list:
for future in tqdm(as_completed(futures)):
key = futures[future]
try:
versions = future.result()
objects += versions
for version in versions:
output_list.write(f"{key},{version['VersionId']}\n")
except Exception as exc:
print("%r generated an exception: %s" % (key, exc))
elif args.prefix:
objects = get_versions(client, args.bucket, args.prefix)
elif args.versionsfile:
with open(args.versionsfile, "r") as input_list:
for line in tqdm(input_list.readlines()):
key, version = line.strip().split(",")
objects += [{"Key": key, "VersionId": version}]
num_objects_to_delete = len(objects)
print(
f"Number of objects (keys/version) in the found for deletion: {num_objects_to_delete}"
)
if args.delete:
if num_objects_to_delete > 0:
number_confirmation = int(
input("Please enter the number of objects to delete to continue: ")
)
if number_confirmation != len(objects):
sys.exit("Delete confirmation failed!")
rounds = math.ceil(len(objects) / batch)
for i in tqdm(range(rounds)):
thisbatch = objects[i * batch : (i + 1) * batch]
response = client.delete_objects(
Bucket=args.bucket,
Delete={"Objects": thisbatch, "Quiet": False},
)
# print(response)
print("Done delete")
else:
print("Finished!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment