Skip to content

Instantly share code, notes, and snippets.

@shokinn
Created March 10, 2024 15:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shokinn/4d0fd4a72de471b26c4e2b2c258da451 to your computer and use it in GitHub Desktop.
Save shokinn/4d0fd4a72de471b26c4e2b2c258da451 to your computer and use it in GitHub Desktop.
Python script to delete already synced remote files (for those stupid ones who forgot to use rclone move) - tqdm required
import os
import subprocess
import argparse
from datetime import datetime, timedelta
import concurrent.futures
from tqdm import tqdm # For progress bar
import logging
import sys
# Set your local and remote paths
local_path = '/path/to/local/data'
# Define your excluded paths (absolute paths or relative to local_path)
excluded_paths = [
os.path.join(local_path, '@Recently-Snapshot'),
os.path.join(local_path, '@Recycle'),
os.path.join(local_path, 'fist-path'),
os.path.join(local_path, 'second/path'),
]
remote_path = 'remoteName:/path/to/remote/data'
# Parse command line arguments
parser = argparse.ArgumentParser(description="Delete files older than a day from a remote location.")
parser.add_argument("--delete", action="store_true", help="Enable actual deletion of files. Use with caution!")
parser.add_argument("--threads", type=int, default=4, help="Number of threads to use for file processing.")
parser.add_argument("--progress", action="store_true", help="Show progress bar.")
parser.add_argument("--logfile", type=str, default=os.path.join(os.path.dirname(sys.argv[0]), "deletion_log.log"), help="Path to the log file.")
parser.add_argument("--verbose", action="store_true", help="Enable verbose mode.")
parser.add_argument("--ignore-errors", action="store_true", help="Continue on errors while processing files.")
args = parser.parse_args()
# Check if logfile already exists
if os.path.exists(args.logfile):
confirm = input(f"The logfile {args.logfile} already exists. Delete it? (yes/no): ")
if confirm.lower() == 'yes':
os.remove(args.logfile)
print(f"Deleted logfile {args.logfile}.")
else:
print("Continuing without deleting the logfile.")
# Setup logging
logging.basicConfig(filename=args.logfile, level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# Get the current time
now = datetime.now()
# Function to check if file or dir should be excluded
def is_excluded(file_path):
for excluded_path in excluded_paths:
# Check if the file path starts with any of the excluded paths
if file_path.startswith(excluded_path):
return True
return False
# Function to get the file's time considering birthtime or mtime
def get_file_time(file_path):
# This uses the 'stat' command to get creation time, which is not the same in all Unix systems
# The format used here '%W' is for the birth time, which might work on your specific Unix system
# If '%W' doesn't work, you might need a different approach depending on the filesystem and OS
try:
# Command to get creation time (birth time) in Unix
output = subprocess.check_output(['stat', '-c', '%W', file_path])
creation_time = int(output.strip())
if creation_time > 0:
return datetime.fromtimestamp(creation_time), 'birthtime'
else:
# Fall back to modification time if birth time is 0 or not available
stat_result = os.stat(file_path)
return datetime.fromtimestamp(stat_result.st_mtime), 'mtime'
except Exception as e:
# Log the error or handle it appropriately
print(f"An error occurred while fetching file time for {file_path}: {e}")
# Default to modification time in case of any error
stat_result = os.stat(file_path)
return datetime.fromtimestamp(stat_result.st_mtime), 'mtime'
# Prepare file list and count total files for progress
print("Prepare file list and count total files for progress. Please wait.")
file_paths = []
for root, dirs, files in os.walk(local_path):
dirs[:] = [d for d in dirs if not is_excluded(os.path.join(root, d))] # Modify dirs in-place to exclude them
for name in files:
full_path = os.path.join(root, name)
if not is_excluded(full_path):
file_paths.append(full_path)
total_files = len(file_paths) # Update total_files after exclusion
files_to_process = []
# Initialize progress bar for scanning files
if args.progress:
progress_iterator = tqdm(total=total_files, desc="Scanning files")
# Function to check if file or dir should be excluded
def is_excluded(file_path):
for excluded_path in excluded_paths:
if file_path.startswith(excluded_path):
return True
return False
# Walk through all files in the local directory
for file_path in file_paths:
if is_excluded(file_path): # Skip excluded files and directories
continue
try:
birth_time, time_used = get_file_time(file_path) # This function will now correctly get birthtime or mtime
if now - birth_time > timedelta(days=1):
relative_path = os.path.relpath(file_path, local_path)
remote_file_path = os.path.join(remote_path, relative_path).replace('\\', '/')
files_to_process.append(remote_file_path)
# Log queued files with information on which time attribute was used
logging.info(f'File queued for deletion: {remote_file_path} (used {time_used})')
except Exception as exc:
error_msg = f'Error processing file {file_path}: {exc}'
if args.ignore_errors:
logging.error(error_msg)
if args.verbose:
print(error_msg)
continue # Skip this file and continue processing
else:
raise # Re-raise the exception and stop the script
if args.progress:
progress_iterator.update(1)
if args.progress:
progress_iterator.close()
# Confirmation for deletion
if args.delete:
confirm = input("Are you sure you want to delete files? This cannot be undone. Type 'yes' to continue: ")
if confirm.lower() != 'yes':
print("Deletion not confirmed. Exiting.")
exit()
# Function to delete a file
def delete_file(remote_file_path):
if args.delete:
delete_command = f'rclone deletefile "{remote_file_path}"'
try:
subprocess.run(delete_command, shell=True, check=True, text=True, capture_output=True)
logging.info(f'Deleted {remote_file_path}')
return True
except subprocess.CalledProcessError as e:
logging.error(f'Error deleting {remote_file_path}: {e.output}')
return False
else:
logging.info(f'Would delete {remote_file_path}')
return False
# Processing files
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {executor.submit(delete_file, file): file for file in files_to_process}
for future in tqdm(concurrent.futures.as_completed(futures), total=len(files_to_process), desc="Processing files", disable=not args.progress):
file = futures[future]
try:
success = future.result()
if args.verbose:
if success:
print(f'Deleted {file}')
else:
print(f'Skipped or failed {file}')
# No console output if not verbose
except Exception as exc:
logging.error(f'{file} generated an exception: {exc}')
if args.verbose:
print(f'{file} generated an exception: {exc}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment