Skip to content

Instantly share code, notes, and snippets.

@hvanmegen
Last active February 8, 2024 21:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hvanmegen/0b15adcf67b32507e0c339a763e84f51 to your computer and use it in GitHub Desktop.
Save hvanmegen/0b15adcf67b32507e0c339a763e84f51 to your computer and use it in GitHub Desktop.
cleanup script for ceph
#!/usr/bin/python3
import os
import sys
import time
import argparse
import shutil
import math
try:
from tqdm import tqdm
except ImportError:
print("tqdm module not found. Please install it using 'pip install tqdm'")
sys.exit(1)
MAX_ERRORS = 5
MAX_WAIT_TIME = 10
NUM_DELETED = 0
NOT_DELETED = 0
SCRIPT_START_TIME = time.time()
NUM_ITEMS = 0
def numerize(n):
if isinstance(n, int):
return '{:,}'.format(n)
elif isinstance(n, float):
return '{:,.2f}'.format(n)
else:
return str(n)
def get_terminal_width():
terminal_size = shutil.get_terminal_size(fallback=(80, 20))
return terminal_size.columns
def truncate_path(path: str, max_length: int = 80, filler: str = "...") -> str:
if len(path) <= max_length:
return path
fn = path.split("/")[-1]
while len(path) > max_length:
parts = path.lstrip("/").split("/")[:-1]
if len(parts) <= 1:
return f"/.../{fn}"
m = math.floor(len(parts) / 2)-1
if parts[m] == filler:
del parts[m]
elif parts[m+1] == filler:
del parts[m+1]
parts[m] = filler
path = "/" + "/".join(parts) + f"/{fn}"
return path
def sort_by_depth(paths):
return sorted(paths, key=lambda x: (os.path.dirname(x).count(os.sep), x), reverse=False)
def retrieve_objects(path, limit, max_length):
objects = []
progress_bar = tqdm(total=limit, disable=not VERBOSE, smoothing=0, leave=True, ascii=False, mininterval=0.2, maxinterval=2, bar_format="Indexing {l_bar} {bar} {r_bar}")
def _recursive_scan(cur_path):
nonlocal objects, progress_bar, limit
if len(objects) >= limit:
progress_bar.update(limit-len(objects))
return
if VERBOSE:
truncated_cur_path = truncate_path(cur_path, max_length)
progress_bar.display(msg="Current path: \033[1m" + truncated_cur_path + "\033[0m", pos=1)
for entry in os.scandir(cur_path):
if len(objects) >= limit:
break
full_path = entry.path
objects.append(full_path)
progress_bar.update(1)
if entry.is_dir(follow_symlinks=False):
_recursive_scan(full_path)
if entry.is_symlink() and not entry.is_file() and not entry.is_dir():
objects.append(full_path)
progress_bar.update(1)
_recursive_scan(path)
if VERBOSE:
progress_bar.display(msg=None, pos=1)
progress_bar.close()
return sort_by_depth(objects)
def remove_object(item):
try:
if os.access(item, os.W_OK):
try:
if os.path.isfile(item) or os.path.islink(item):
os.remove(item)
elif os.path.isdir(item):
os.rmdir(item)
return True
except Exception as e:
return False
else:
return False
except Exception as e:
return False
def main(args, repeat=False):
def deletion_action(args):
global NUM_DELETED
global NUM_ITEMS
global NOT_DELETED
global VERBOSE
dir_path, num_items, verbose = args.directory, args.number, args.verbose
VERBOSE = verbose
NUM_ITEMS = num_items
NOT_DELETED = 0
items = []
error_count = 0
error_list = []
wait_time = 2
firstfiles = os.listdir(dir_path)
if len(firstfiles) == 0:
print("Nothing to delete!")
exit(0)
print(f"Scanning \033[1m{dir_path}\033[0m and compiling list of \033[1m{numerize(num_items)}\033[0m items to delete:")
max_path_length = get_terminal_width() - 20 # Assuming you want to leave some padding
items = retrieve_objects(dir_path, num_items, max_path_length)
len_items = len(items)
if len_items < num_items:
print(f"Location \033[1m{dir_path}\033[0m only contains \033[1m{numerize(len_items)}\033[0m items available for deletion; object limit adjusted.")
num_items = len_items
print(f"List of \033[1m{numerize(num_items)}\033[0m items compiled. Proceeding with deletion:")
with tqdm(total=num_items, disable=not verbose, smoothing=0, leave=True, ascii=False, mininterval=0.2, maxinterval=2, bar_format="Deleting {l_bar} {bar} {r_bar}") as progress_bar:
start_time = time.time()
for item in items:
try:
deletion_start = time.time()
if verbose:
truncated_item = truncate_path(item, max_path_length)
progress_bar.display(msg="Deleting: \033[1m" + truncated_item + "\033[0m", pos=1)
if remove_object(item):
NUM_DELETED += 1
else:
NOT_DELETED += 1
deletion_time = time.time() - deletion_start
if deletion_time > wait_time:
wait_time *= 2 if wait_time < MAX_WAIT_TIME // 2 else MAX_WAIT_TIME
time.sleep(wait_time)
if wait_time >= MAX_WAIT_TIME:
print(f"File deletion exceeded {MAX_WAIT_TIME}s. Exiting.")
sys.exit(1)
progress_bar.update(1)
except Exception as e:
error_count += 1
if error_count > MAX_ERRORS:
timeused = round(time.time() - start_time, 2)
print(f"Error: {e}. Took {timeused} {'second' if (timeused == 1) else 'seconds'}.")
sys.exit(1)
time.sleep(wait_time * (2 ** (error_count - 1)))
if verbose:
progress_bar.refresh()
if verbose:
progress_bar.display(msg=None, pos=1)
if (NOT_DELETED >= 1):
if verbose:
print(f"Could not delete \033[1m{NOT_DELETED}\033[0m {'item' if (NOT_DELETED == 1) else 'items'}:")
for item in error_list:
item_type = "(dir)" if os.path.isdir(item) else "(file)"
print(f"- {item_type} {item}")
del error_list
NOT_DELETED = 0
print(f"Deleted \033[1m{numerize(num_items)}\033[0m items in \033[1m{time.time() - SCRIPT_START_TIME:.2f}\033[0m seconds at \033[1m{num_items / (time.time() - SCRIPT_START_TIME):.2f}\033[0m it/s.\n")
while True:
deletion_action(args)
if not repeat:
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="This script deletes limited number of files and directories from a given directory. It accepts a directory path, number of items to delete, and an optional verbose flag (-v) for detailed output. To keep repeating the deletion order, use --repeat (-r). (Henry van Megen <hvanmegen@gmail.com>)")
parser.add_argument("directory")
parser.add_argument("number", type=int)
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-r", "--repeat", action="store_true")
args = parser.parse_args()
try:
main(args, repeat=args.repeat)
except KeyboardInterrupt:
print("Process interrupted by user (CTRL-C). Exiting.")
if NUM_DELETED >= 1:
print(f"\nDeleted \033[1m{numerize(NUM_DELETED)}\033[0m items in \033[1m{time.time() - SCRIPT_START_TIME:.2f}\033[0m seconds at \033[1m{NUM_DELETED / (time.time() - SCRIPT_START_TIME):.2f}\033[0m it/s.\n")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment