Skip to content

Instantly share code, notes, and snippets.

@NooNameR
Last active May 18, 2025 08:21
Show Gist options
  • Save NooNameR/aca7968d590be6111cc9eca0b35f48cb to your computer and use it in GitHub Desktop.
Save NooNameR/aca7968d590be6111cc9eca0b35f48cb to your computer and use it in GitHub Desktop.
unraid mover with qbit
import os
import shutil
import time
import sys
import logging
import subprocess
from qbit_helper import QbitHelper, QbitHelpers
def is_file_within_age_range(current_time: float, filepath: str, min_days: int, max_days: int) -> bool:
"""
Check if the file's modification time is within the specified age range.
:param filepath: Path to the file
:param min_days: Minimum file age in days
:param max_days: Maximum file age in days
:return: True if the file's age is within the range, False otherwise
"""
file_mtime = os.stat(filepath).st_mtime
file_age_days = (current_time - file_mtime) / (60 * 60 * 24) # Convert seconds to days
return min_days <= file_age_days <= max_days
# Example usage
# run_rsync("/path/source/", "/path/destination/", extra_args=["--ignore-existing"])
def run_rsync(src: str, dest: str, extra_args=None):
args = ["rsync", "-a"]
if extra_args:
args.extend(extra_args)
args.extend([src, dest])
result = subprocess.run(args, capture_output=True, text=True)
if result.stdout:
for line in result.stdout.splitlines():
logging.info("[rsync stdout] %s", line)
if result.stderr:
for line in result.stderr.splitlines():
logging.error("[rsync stderr] %s", line)
if result.returncode != 0:
logging.error("rsync failed with return code %d", result.returncode)
def copy_file_with_metadata(src_file, dest_file):
shutil.copy2(src_file, dest_file)
src_stat = os.stat(src_file)
try:
os.chown(dest_file, src_stat.st_uid, src_stat.st_gid)
except PermissionError as e:
logging.error("Unable to preserve ownership for %s. Requires elevated privileges. %s", dest_file, e)
def migrate_files(qbit: QbitHelpers, args) -> list[str]:
import fnmatch
"""
Migrate files from a source directory to a destination directory
and recreate hardlinks at the destination based on the source's
hardlink structure, only moving files within a specific age range.
:param src: Source directory path
:param dest: Destination directory path
:param min_days: Minimum file age in days
:param max_days: Maximum file age in days
:return: A list of successfully processed source files
"""
current_time = time.time()
inode_to_dest_file = {}
processed_files = [] # List to track successfully processed files
for share_path in [os.path.join(args.source, share) for share in set(args.share) if share]:
logging.info("Scanning %s...", share_path)
for root, _, files in os.walk(share_path):
# Determine relative path from source to current directory
rel_path = os.path.relpath(root, args.source)
dest_dir = os.path.join(args.destination, rel_path)
# Skip checking orphaned and recycled directories
if any(fnmatch.fnmatch(rel_path, pattern) for pattern in args.ignore):
logging.info("Skipping directory: %s, matched ignored", root)
continue
for file in files:
src_file = os.path.join(root, file)
dest_file = os.path.join(dest_dir, file)
# Get the inode of the source file
inode = os.stat(src_file).st_ino
# Check if the file is within the age range
if inode not in inode_to_dest_file and not is_file_within_age_range(current_time, src_file, args.min_days, args.max_days):
logging.debug("Skipping file (out of age range): %s", src_file)
continue
if args.dry_run:
logging.info("Skipping file processing: %s", src_file)
continue
# Skip if the file already exists in the destination with the same size
if os.path.exists(dest_file):
src_stat = os.stat(src_file)
dest_stat = os.stat(dest_file)
if src_stat.st_size == dest_stat.st_size:
logging.info("Skipping existing file: %s", dest_file)
inode_to_dest_file[inode] = dest_file # Map the inode to the existing file
processed_files.append(src_file)
continue
if not os.path.exists(dest_dir):
# Create directories in the destination
logging.info("Creating directory: %s", dest_dir)
os.makedirs(dest_dir, exist_ok=True)
logging.info("Created directory: %s", dest_dir)
# Set permissions for new directory
try:
logging.debug("Getting permissions from source directory: %s", root)
src_stat = os.stat(root)
logging.debug("Setting permissions: [%s:%s] to %s", src_stat.st_uid, src_stat.st_gid, dest_dir)
os.chown(dest_dir, src_stat.st_uid, src_stat.st_gid)
logging.info("Set permissions [%s:%s] for destination directory: %s", src_stat.st_uid, src_stat.st_gid, dest_dir)
except PermissionError as e:
logging.error("Unable to set ownership for %s. %s", dest_file, e)
if inode in inode_to_dest_file:
# If inode is already processed, create a hard link
logging.info("Hardlinking: %s -> %s", dest_file, existing_dest_file)
existing_dest_file = inode_to_dest_file[inode]
os.link(existing_dest_file, dest_file)
logging.info("Hardlinked: %s -> %s", dest_file, existing_dest_file)
else:
# Copy the file to the destination and track its inode
logging.info("Copying: %s -> %s", src_file, dest_file)
qbit.pause(src_file)
# run_rsync(src_file, dest_file)
copy_file_with_metadata(src_file, dest_file)
inode_to_dest_file[inode] = dest_file
logging.info("Copied: %s -> %s", src_file, dest_file)
if os.path.exists(dest_file):
processed_files.append(src_file)
return processed_files
def delete_source_files(processed_files: list[str]) -> int:
"""
Delete the source files that were successfully migrated
and remove any empty directories.
:param processed_files: List of successfully processed source files
"""
total = 0
if not processed_files:
return total
for src_file in processed_files:
try:
file_size = os.path.getsize(src_file)
logging.info("Deleting source file: %s", src_file)
os.remove(src_file)
logging.info("Deleted source file: %s", src_file)
total += file_size
except Exception as e:
logging.error("Failed to delete %s: %s", src_file, e)
# Remove empty directories
for root, dirs, _ in os.walk(os.path.dirname(processed_files[0]), topdown=False):
for dir_ in dirs:
dir_path = os.path.join(root, dir_)
if not os.listdir(dir_path): # Directory is empty
logging.info("Removing empty directory: %s", dir_path)
os.rmdir(dir_path)
logging.info("Removed empty directory: %s", dir_path)
return total
def format_bytes_to_gib(size_bytes: int) -> str:
gib = size_bytes / (1024 ** 3)
return f"{gib:.2f} GiB"
if __name__ == "__main__":
import argparse
# Argument parsing
parser = argparse.ArgumentParser(description="Migrate files and preserve hardlinks, only moving files within a specific age range. Deletes source files after successful migration.")
parser.add_argument("source", help="Path to the source directory")
parser.add_argument("destination", help="Path to the destination directory")
parser.add_argument("--min_days", type=int, help="Minimum file age in days", default=30)
parser.add_argument("--max_days", type=int, help="Maximum file age in days", default=365)
parser.add_argument("--host", nargs='*', help="qbittorrent host including port", default=[])
parser.add_argument("--user", help="qbittorrent user", default="admin")
parser.add_argument("--password", help="qbittorrent password", default="adminadmin")
parser.add_argument("--uid", type=int, help="User ID for new directory", default=99)
parser.add_argument("--gid", type=int, help="Group ID for new directory", default=100)
parser.add_argument("--share", nargs='*', help="Shares to include", default=["data"])
parser.add_argument(
"--ignore",
nargs='*',
default=["**/.Orphaned", "**/.RecycleBin"],
help="List of directory paths (relative to source) to ignore"
)
parser.add_argument("--dry_run", help="Match and print, but not move files", action="store_true", default=False)
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO,
format=("[DRY RUN] " if args.dry_run else "") + "%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
startingtotal, startingused, startingfree = shutil.disk_usage(args.source)
qbit = QbitHelpers([QbitHelper(host, args.user, args.password, args.source) for host in args.host if host])
try:
processed_files = migrate_files(qbit, args)
emptiedspace = delete_source_files(processed_files)
_, _, ending_free = shutil.disk_usage(args.source)
logging.info("Migration and hardlink recreation completed successfully from '%s' to '%s'", args.source, args.destination)
logging.info("Starting free space: %s -- Ending free space: %s", format_bytes_to_gib(startingfree), format_bytes_to_gib(ending_free))
logging.info("FREED UP %s TOTAL SPACE", format_bytes_to_gib(emptiedspace))
except IndexError as e:
logging.error("Error: %s", e)
except Exception as e:
logging.error("Error: %s", e)
finally:
qbit.resume()
#!/usr/bin/env python3
# This standalone script is used to pause torrents older than last x days,
# run mover (in Unraid) and start torrents again once completed
import os
import sys
import time
import logging
try:
from qbittorrentapi import APIConnectionError
from qbittorrentapi import Client
from qbittorrentapi import LoginFailed
except ModuleNotFoundError:
print('Requirements Error: qbittorrent-api not installed. Please install using the command "pip install qbittorrent-api"')
sys.exit(1)
class QbitHelper:
def __init__(self, host: str, user: str, password: str, root: str):
try:
client = Client(host=host, username=user, password=password)
except LoginFailed:
raise ("Qbittorrent Error: Failed to login. Invalid username/password.")
except APIConnectionError:
raise ("Qbittorrent Error: Unable to connect to the client.")
except Exception:
raise ("Qbittorrent Error: Unable to connect to the client.")
self.root = root
self.torrents = self.__filter(client.torrents.info(status_filter="completed", sort="added_on", reverse=True))
logging.info(f"Found %d in the source: %s", len(self.torrents), self.root)
self.paused_torrents = []
def resume(self):
for torrent in self.paused_torrents:
logging.info("[%s] Resuming torrent: %s [%d] -> %s", torrent.hash, torrent.name, torrent.added_on, torrent.content_path)
torrent.resume()
def __cache_path(self, torrent) -> str:
return os.path.join(self.root, torrent.content_path.lstrip(os.sep))
def __filter(self, torrents):
result = []
for torrent in torrents:
if os.path.exists(self.__cache_path(torrent)):
result.append(torrent)
return result
def __has_file(self, torrent, path: str) -> bool:
content_path = self.__cache_path(torrent)
if os.path.isdir(content_path):
for root, _, files in os.walk(content_path):
for file in files:
full_path = os.path.join(root, file)
if os.path.samefile(full_path, path):
return True
return False
return os.path.samefile(content_path, path)
def pause(self, path: str):
for torrent in self.torrents:
if self.__has_file(torrent, path):
logging.info("[%s] Pausing torrent: %s [%d] -> %s", torrent.hash, torrent.name, torrent.added_on, torrent.content_path)
torrent.pause()
self.paused_torrents.append(torrent)
self.torrents = [t for t in self.torrents if t not in self.paused_torrents]
class QbitHelpers:
def __init__(self, qbits: list[QbitHelper]):
self.qbits = qbits
def pause(self, path: str):
for qbit in self.qbits:
qbit.pause(path)
def resume(self):
for qbit in self.qbits:
qbit.resume()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment