Created
March 10, 2024 15:12
-
-
Save shokinn/4d0fd4a72de471b26c4e2b2c258da451 to your computer and use it in GitHub Desktop.
Python script to delete already synced remote files (for those stupid ones who forgot to use rclone move) - tqdm required
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import argparse | |
from datetime import datetime, timedelta | |
import concurrent.futures | |
from tqdm import tqdm # For progress bar | |
import logging | |
import sys | |
# Set your local and remote paths | |
local_path = '/path/to/local/data' | |
# Define your excluded paths (absolute paths or relative to local_path) | |
excluded_paths = [ | |
os.path.join(local_path, '@Recently-Snapshot'), | |
os.path.join(local_path, '@Recycle'), | |
os.path.join(local_path, 'fist-path'), | |
os.path.join(local_path, 'second/path'), | |
] | |
remote_path = 'remoteName:/path/to/remote/data' | |
# Parse command line arguments | |
parser = argparse.ArgumentParser(description="Delete files older than a day from a remote location.") | |
parser.add_argument("--delete", action="store_true", help="Enable actual deletion of files. Use with caution!") | |
parser.add_argument("--threads", type=int, default=4, help="Number of threads to use for file processing.") | |
parser.add_argument("--progress", action="store_true", help="Show progress bar.") | |
parser.add_argument("--logfile", type=str, default=os.path.join(os.path.dirname(sys.argv[0]), "deletion_log.log"), help="Path to the log file.") | |
parser.add_argument("--verbose", action="store_true", help="Enable verbose mode.") | |
parser.add_argument("--ignore-errors", action="store_true", help="Continue on errors while processing files.") | |
args = parser.parse_args() | |
# Check if logfile already exists | |
if os.path.exists(args.logfile): | |
confirm = input(f"The logfile {args.logfile} already exists. Delete it? (yes/no): ") | |
if confirm.lower() == 'yes': | |
os.remove(args.logfile) | |
print(f"Deleted logfile {args.logfile}.") | |
else: | |
print("Continuing without deleting the logfile.") | |
# Setup logging | |
logging.basicConfig(filename=args.logfile, level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') | |
# Get the current time | |
now = datetime.now() | |
# Function to check if file or dir should be excluded | |
def is_excluded(file_path): | |
for excluded_path in excluded_paths: | |
# Check if the file path starts with any of the excluded paths | |
if file_path.startswith(excluded_path): | |
return True | |
return False | |
# Function to get the file's time considering birthtime or mtime | |
def get_file_time(file_path): | |
# This uses the 'stat' command to get creation time, which is not the same in all Unix systems | |
# The format used here '%W' is for the birth time, which might work on your specific Unix system | |
# If '%W' doesn't work, you might need a different approach depending on the filesystem and OS | |
try: | |
# Command to get creation time (birth time) in Unix | |
output = subprocess.check_output(['stat', '-c', '%W', file_path]) | |
creation_time = int(output.strip()) | |
if creation_time > 0: | |
return datetime.fromtimestamp(creation_time), 'birthtime' | |
else: | |
# Fall back to modification time if birth time is 0 or not available | |
stat_result = os.stat(file_path) | |
return datetime.fromtimestamp(stat_result.st_mtime), 'mtime' | |
except Exception as e: | |
# Log the error or handle it appropriately | |
print(f"An error occurred while fetching file time for {file_path}: {e}") | |
# Default to modification time in case of any error | |
stat_result = os.stat(file_path) | |
return datetime.fromtimestamp(stat_result.st_mtime), 'mtime' | |
# Prepare file list and count total files for progress | |
print("Prepare file list and count total files for progress. Please wait.") | |
file_paths = [] | |
for root, dirs, files in os.walk(local_path): | |
dirs[:] = [d for d in dirs if not is_excluded(os.path.join(root, d))] # Modify dirs in-place to exclude them | |
for name in files: | |
full_path = os.path.join(root, name) | |
if not is_excluded(full_path): | |
file_paths.append(full_path) | |
total_files = len(file_paths) # Update total_files after exclusion | |
files_to_process = [] | |
# Initialize progress bar for scanning files | |
if args.progress: | |
progress_iterator = tqdm(total=total_files, desc="Scanning files") | |
# Function to check if file or dir should be excluded | |
def is_excluded(file_path): | |
for excluded_path in excluded_paths: | |
if file_path.startswith(excluded_path): | |
return True | |
return False | |
# Walk through all files in the local directory | |
for file_path in file_paths: | |
if is_excluded(file_path): # Skip excluded files and directories | |
continue | |
try: | |
birth_time, time_used = get_file_time(file_path) # This function will now correctly get birthtime or mtime | |
if now - birth_time > timedelta(days=1): | |
relative_path = os.path.relpath(file_path, local_path) | |
remote_file_path = os.path.join(remote_path, relative_path).replace('\\', '/') | |
files_to_process.append(remote_file_path) | |
# Log queued files with information on which time attribute was used | |
logging.info(f'File queued for deletion: {remote_file_path} (used {time_used})') | |
except Exception as exc: | |
error_msg = f'Error processing file {file_path}: {exc}' | |
if args.ignore_errors: | |
logging.error(error_msg) | |
if args.verbose: | |
print(error_msg) | |
continue # Skip this file and continue processing | |
else: | |
raise # Re-raise the exception and stop the script | |
if args.progress: | |
progress_iterator.update(1) | |
if args.progress: | |
progress_iterator.close() | |
# Confirmation for deletion | |
if args.delete: | |
confirm = input("Are you sure you want to delete files? This cannot be undone. Type 'yes' to continue: ") | |
if confirm.lower() != 'yes': | |
print("Deletion not confirmed. Exiting.") | |
exit() | |
# Function to delete a file | |
def delete_file(remote_file_path): | |
if args.delete: | |
delete_command = f'rclone deletefile "{remote_file_path}"' | |
try: | |
subprocess.run(delete_command, shell=True, check=True, text=True, capture_output=True) | |
logging.info(f'Deleted {remote_file_path}') | |
return True | |
except subprocess.CalledProcessError as e: | |
logging.error(f'Error deleting {remote_file_path}: {e.output}') | |
return False | |
else: | |
logging.info(f'Would delete {remote_file_path}') | |
return False | |
# Processing files | |
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: | |
futures = {executor.submit(delete_file, file): file for file in files_to_process} | |
for future in tqdm(concurrent.futures.as_completed(futures), total=len(files_to_process), desc="Processing files", disable=not args.progress): | |
file = futures[future] | |
try: | |
success = future.result() | |
if args.verbose: | |
if success: | |
print(f'Deleted {file}') | |
else: | |
print(f'Skipped or failed {file}') | |
# No console output if not verbose | |
except Exception as exc: | |
logging.error(f'{file} generated an exception: {exc}') | |
if args.verbose: | |
print(f'{file} generated an exception: {exc}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment