Last active
October 21, 2025 23:26
-
-
Save ShanaryS/6fbc60327ad5f7043c81e5b1f33da404 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import os | |
| import shutil | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, NewType, Union, Dict, List, Set | |
| import bencodepy # type: ignore | |
| ### ---------- SETUP ---------- ### | |
| # python 3.6+ | |
| # pip install bencodepy | |
| ### ---------- SETUP ---------- ### | |
| ### ---------- INSTRUCTIONS ---------- ### | |
| ### ---------- INSTRUCTIONS ---------- ### | |
| ### ---------- INSTRUCTIONS ---------- ### | |
| # This script is entirely optional and not necessary for cross-seed v6. | |
| # If you have an exotic setup, this script may not work for you. | |
| # If DRY_RUN gives no errors, it should be able to run without issues. | |
| # ETA ~30s per 1k moved torrents | |
| # This script will convert all torrents with the cross-seed tag in client | |
| # to link at LINK_DIR/TrakerName, as v6 `flatLinking: false` does. | |
| # Torrent with save paths already in LINK_DIR as LINK_DIR/tracker will be ignored. | |
| # WARNING: You will likely need to clean up orphaned files using qbit_manage as this script | |
| # does not remove files, only creates links to the new location. | |
| # Please verify you can do this before running this script. | |
| # 1. Populate ONLY the options in the CONFIG section below according to your cross-seed setup | |
| # 2. Ensure you will run the script on the same OS as the qBittorrent client | |
| # 3. Ensure the paths that qBittorrent uses are accessible to this script, see SCRIPT_MOUNT and QBIT_MOUNT in CONFIG | |
| # 4. STOP qBittorrent and make a manual backup of the BT_backup folder (this script will backup as well) | |
| # 5. Run the script with DRY_RUN = True and check the console output for a few paths to ensure they are correct | |
| # 6. Run the script with DRY_RUN = False to create the links | |
| # 7. Start qBittorrent and check if any torrents are in MISSING_FILES state, check the category and paths of a few of the changed torrents, pause and recheck a couple for verification | |
| # 8. If anything is wrong, STOP qBittorrent, replace the BT_backup folder with the backup, and start qBittorrent. Contact the cross-seed discord for support | |
| # 9. If everything is correct, you will NEED to run qbit_manage to clean up the orphaned files. This script does not remove files, only creates links to the new location | |
| ### ---------- END INSTRUCTIONS ---------- ### | |
| ### ---------- END INSTRUCTIONS ---------- ### | |
| ### ---------- END INSTRUCTIONS ---------- ### | |
| ### ---------- CONFIG ---------- | |
| ### ---------- CONFIG ---------- | |
| ### ---------- CONFIG ---------- | |
| # Set to True to only log what would be done | |
| DRY_RUN = True | |
| # This must be the path from the prespective of this script | |
| # It may differ from the path you have in cross-seed | |
| # QUIT qBittorrent and BACKUP THIS FOLDER before running this script | |
| TORRENT_DIR = "" | |
| LINK_TYPE = "hardlink" | |
| LINK_CATEGORY = "cross-seed-link" | |
| DUPLICATE_CATEGORIES = False | |
| # This must be the path from the prespective of this script | |
| # It may differ from the path you have in cross-seed | |
| # If the script says lots of files are not found, this may be the cause | |
| LINK_DIR = "" | |
| # If qbit is in docker with different volume paths than this script sees, | |
| # set these to the volume mount for your qbit download folder. | |
| # e.g if the volume is: /path/outside/docker:/path/qbit/sees | |
| # then: SCRIPT_MOUNT = "/path/outside/docker" and QBIT_MOUNT = "/path/qbit/sees" | |
| # If the mounts are the same, e.g the volume is: /path:/path or /a/b/c:/a/b/c | |
| # then leave these as empty strings: SCRIPT_MOUNT = "" and QBIT_MOUNT = "" | |
| # In any case, run this script with DRY_RUN = True, it will error if anything is wrong | |
| # If the script says lots of files are not found, this may be the cause | |
| SCRIPT_MOUNT = "" | |
| QBIT_MOUNT = "" | |
| # The key is the tracker url or announce id, the value is the folder name | |
| # The url is for the tracker, NOT the website. View the tracker urls for a torrent in client to find the correct url | |
| # You don't need the entire url, just a unique part of it | |
| # Any torrents without one of these will be ignored | |
| # Two items are prefiled with example tracker url and announce id | |
| TRACKER_URL_NAME = { | |
| "example.com": "Example", | |
| "fewafjewoaufeiowajpofe": "TrackerName", | |
| } | |
| ### ---------- END CONFIG ---------- | |
| ### ---------- END CONFIG ---------- | |
| ### ---------- END CONFIG ---------- | |
| # types | |
| FastResume = NewType("FastResume", Dict[bytes, Any]) # OrderedDict[bytes, Any] | |
| Torrent = NewType("Torrent", Dict[bytes, Any]) # OrderedDict[bytes, Any] | |
| PathScript = NewType("PathScript", Path) | |
| PathQbit = NewType("PathQbit", Path) | |
| assert sys.version_info >= (3, 6), f'Python 3.6+ required, current version: {sys.version_info}' | |
| assert isinstance(DRY_RUN, bool), 'DRY_RUN must be a boolean: DRY_RUN = True or DRY_RUN = False' | |
| assert isinstance(TORRENT_DIR, str), 'TORRENT_DIR must be a string like: TORRENT_DIR = "/path/to/folder"' | |
| TORRENT_DIR_PATH = PathScript(Path(TORRENT_DIR)) | |
| assert TORRENT_DIR_PATH.exists() and TORRENT_DIR_PATH.is_dir(), f'TORRENT_DIR does not exist or is not a folder: {TORRENT_DIR}' | |
| assert LINK_TYPE in ["hardlink", "symlink"], 'LINK_TYPE must be either "hardlink" or "symlink": LINK_TYPE = "hardlink"' | |
| assert isinstance(LINK_CATEGORY, str), 'LINK_CATEGORY must be a string like: LINK_CATEGORY = "xseed"' | |
| assert isinstance(DUPLICATE_CATEGORIES, bool), 'DUPLICATE_CATEGORIES must be a boolean: DUPLICATE_CATEGORIES = True or DUPLICATE_CATEGORIES = False' | |
| assert isinstance(LINK_DIR, str), 'LINK_DIR must be a string like: LINK_DIR = "/path/to/folder"' | |
| LINK_DIR_PATH = PathScript(Path(LINK_DIR)) | |
| assert Path(LINK_DIR_PATH).exists() and Path(LINK_DIR_PATH).is_dir(), f'LINK_DIR does not exist or is not a folder: {LINK_DIR}' | |
| assert isinstance(SCRIPT_MOUNT, str) and isinstance(QBIT_MOUNT, str), 'SCRIPT_MOUNT and QBIT_MOUNT must be strings, set to "" to disable' | |
| assert SCRIPT_MOUNT.endswith("/") == QBIT_MOUNT.endswith("/"), 'SCRIPT_MOUNT and QBIT_MOUNT must both end with "/" or neither' | |
| assert len(TRACKER_URL_NAME) > 0, 'TRACKER_URL_NAME must be a dictionary like: TRACKER_URL_NAME = {"https://tracker.url": "tracker_name"}' | |
| # Logging | |
| LOG_NAME = f"{Path(__file__).stem}.log" | |
| LOG_FORMAT_DATE = "%Y-%m-%d %H:%M:%S"; LOG_FORMAT_STREAM = "%(asctime)s.%(msecs)03d %(levelname)s: %(message)s"; LOG_FORMAT_FILE = "%(asctime)s.%(msecs)03d %(levelname)s: %(message)s" | |
| LOG_HANDLER_STREAM = logging.StreamHandler(sys.stdout) | |
| LOG_HANDLER_FILE = logging.FileHandler(LOG_NAME) | |
| LOG_HANDLER_STREAM.setFormatter(logging.Formatter(LOG_FORMAT_STREAM, LOG_FORMAT_DATE)); LOG_HANDLER_FILE.setFormatter(logging.Formatter(LOG_FORMAT_FILE, LOG_FORMAT_DATE)) | |
| log = logging.getLogger(LOG_NAME) | |
| log.setLevel(logging.DEBUG); LOG_HANDLER_STREAM.setLevel(logging.DEBUG); LOG_HANDLER_STREAM.setLevel(logging.DEBUG) | |
| log.addHandler(LOG_HANDLER_STREAM) | |
| log.addHandler(LOG_HANDLER_FILE) | |
| # Backup TORRENT_DIR | |
| TORRENT_DIR_ORIG = TORRENT_DIR_PATH.with_name(TORRENT_DIR_PATH.name + "_orig") | |
| assert not TORRENT_DIR_ORIG.exists(), f'TORRENT_DIR_ORIG already exists, delete it before running this script: {TORRENT_DIR_ORIG}' | |
| log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Backing up TORRENT_DIR to TORRENT_DIR_ORIG: {TORRENT_DIR_ORIG}") | |
| if not DRY_RUN: | |
| shutil.copytree(TORRENT_DIR_PATH, TORRENT_DIR_ORIG) | |
| assert TORRENT_DIR_ORIG.exists() and TORRENT_DIR_ORIG.is_dir(), f'Failed to copy TORRENT_DIR to TORRENT_DIR_ORIG: {TORRENT_DIR_ORIG}' | |
| assert len(list(TORRENT_DIR_ORIG.glob("*"))) == len(list(TORRENT_DIR_PATH.glob("*"))), 'Failed to copy all files from TORRENT_DIR to TORRENT_DIR_ORIG' | |
| CROSS_SEED_TAG = b"cross-seed" | |
| TORRENT_CATEGORY_SUFFIX = ".cross-seed" # Not bytes | |
| ORIGINAL_LAYOUT = b"Original" | |
| SUBFOLDER_LAYOUT = b"Subfolder" | |
| NO_SUBFOLDER_LAYOUT = b"NoSubfolder" | |
| log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Processing cross-seed torrents in TORRENT_DIR: {TORRENT_DIR}") | |
| total_cross_seeds: int = 0 | |
| total_linked: int = 0 | |
| for fastresume_file in TORRENT_DIR_PATH.glob("*.fastresume"): | |
| fastresume: FastResume = bencodepy.decode_from_file(fastresume_file) # type: ignore | |
| qbt_tags: List[bytes] = fastresume[b'qBt-tags'] # type: ignore | |
| if CROSS_SEED_TAG not in qbt_tags: | |
| continue | |
| total_cross_seeds += 1 | |
| tracker_name: Union[str, None] = None | |
| trackers: List[List[bytes]] = fastresume[b'trackers'] # type: ignore | |
| for tracker_urls_b in trackers: # type: ignore | |
| for tracker_url_b in tracker_urls_b: # type: ignore | |
| tracker_url = tracker_url_b.decode().lower() # type: ignore | |
| for url, name in TRACKER_URL_NAME.items(): | |
| if url in tracker_url: | |
| tracker_name = name | |
| break | |
| if tracker_name: | |
| break | |
| if not tracker_name: | |
| continue | |
| old_save_path: PathQbit = PathQbit(Path(fastresume[b'qBt-savePath'].decode()) if fastresume.get(b'qBt-savePath') else Path(fastresume[b'save_path'].decode())) # type: ignore | |
| old_save_path_script: PathScript = PathScript(old_save_path) | |
| link_dir_qbit: PathQbit = PathQbit(LINK_DIR_PATH) | |
| mounts_are_equal: bool = True | |
| if old_save_path.parts[:2] != link_dir_qbit.parts[:2]: | |
| mounts_are_equal = False | |
| if not SCRIPT_MOUNT or not QBIT_MOUNT: | |
| log.critical(f'FATAL ERROR: You must set SCRIPT_MOUNT and QBIT_MOUNT - Drive mismatch between save_path and LINK_DIR for {fastresume_file} (see instructions step #3): {old_save_path} --> {link_dir_qbit}') # type: ignore | |
| exit(1) | |
| qbit_root: str = str(Path(*old_save_path.parts[:2])) | |
| new_link_root: str = str(Path(*Path(str(link_dir_qbit).replace(SCRIPT_MOUNT, QBIT_MOUNT, 1)).parts[:2])) | |
| old_save_path_script = PathScript(Path(str(old_save_path).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1))) | |
| link_dir_qbit = PathQbit(Path(str(link_dir_qbit).replace(SCRIPT_MOUNT, QBIT_MOUNT, 1))) | |
| assert qbit_root == new_link_root, f'FATAL ERROR: SCRIPT_MOUNT = "{SCRIPT_MOUNT}" or QBIT_MOUNT = "{QBIT_MOUNT}" is invalid - Drive mismatch between save_path and LINK_DIR for {fastresume_file} (see instructions step #3): {old_save_path} --> {link_dir_qbit}' # type: ignore | |
| new_save_path_script = LINK_DIR_PATH/tracker_name | |
| if old_save_path_script.resolve() == new_save_path_script.resolve(): | |
| continue | |
| new_save_path = PathQbit(link_dir_qbit/tracker_name) | |
| torrent_hash: str = fastresume_file.stem # qbit uses v2 hash trunctated to 40 chars if available | |
| torrent_file = TORRENT_DIR_PATH/f"{torrent_hash}.torrent" | |
| if not torrent_file.exists(): | |
| log.error(f"Torrent file not found: {torrent_file}") | |
| continue | |
| torrent: Torrent = bencodepy.decode_from_file(torrent_file) # type: ignore | |
| info_dict: Dict[bytes, Any] = torrent[b'info'] # type: ignore | |
| info_name: bytes = info_dict[b'name'] # type: ignore | |
| torrent_name: str = info_name.decode() | |
| log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}{torrent_name} [{torrent_hash}]: {old_save_path} --> {new_save_path}") | |
| info_files: Union[List[Dict[bytes, Any]], None] = info_dict.get(b'files') # type: ignore | |
| qbt_content_layout: bytes = fastresume[b'qBt-contentLayout'] # type: ignore | |
| # Name is "" if multi file torrent without root folder, Replace empty path segments with "_" by convention, qbit removes / from name | |
| # Implementation source from: https://github.com/qbittorrent/qBittorrent/blob/eba5cbb8039d4766daaa51ed4e16f867af3505ac/src/base/bittorrent/sessionimpl.cpp#L2759 | |
| file_paths: List[Path] = [] | |
| subfolder: str | |
| root_folder: Union[str, None] = None | |
| if not info_files: | |
| file_path = Path(info_name.decode().replace("/", "")) # qbit removes / from name # type: ignore | |
| file_paths.append(file_path) | |
| subfolder = file_path.stem | |
| if len(file_path.parts) > 1: | |
| root_folder = file_path.parts[0] | |
| else: | |
| for file_info in info_files: # type: ignore | |
| file_paths.append(Path(info_name.decode().replace("/", ""))/Path(*[p.decode() or "_" for p in file_info[b'path']])) # type: ignore | |
| subfolder = Path(*[p.decode() or "_" for p in info_files[0][b'path']]).stem # type: ignore | |
| for file_info in info_files: # type: ignore | |
| file_path = Path(info_name.decode().replace("/", ""))/Path(*[p.decode() or "_" for p in file_info[b'path']]) # type: ignore | |
| if len(file_path.parts) <= 1: | |
| root_folder = None | |
| break | |
| if root_folder is None: | |
| root_folder = file_path.parts[0] | |
| elif root_folder != file_path.parts[0]: | |
| root_folder = None | |
| break | |
| full_paths_map: Dict[PathQbit, PathQbit] = {} | |
| for file_path in file_paths: | |
| if qbt_content_layout == ORIGINAL_LAYOUT: | |
| full_paths_map[old_save_path/file_path] = new_save_path/file_path | |
| elif qbt_content_layout == SUBFOLDER_LAYOUT: | |
| if root_folder: full_paths_map[old_save_path/file_path] = new_save_path/file_path | |
| else: full_paths_map[old_save_path/subfolder/file_path] = new_save_path/file_path | |
| elif qbt_content_layout == NO_SUBFOLDER_LAYOUT: | |
| if root_folder: full_paths_map[old_save_path/Path(*file_path.parts[1:])] = new_save_path/file_path | |
| else: full_paths_map[old_save_path/file_path] = new_save_path/file_path | |
| else: | |
| log.error(f"Unknown content layout: {qbt_content_layout.decode()}") # type: ignore | |
| raise ValueError(f"Unknown content layout: {qbt_content_layout.decode()}") # type: ignore | |
| skip: bool = False | |
| for src, dest in full_paths_map.items(): | |
| if mounts_are_equal: | |
| src_script: PathScript = PathScript(src) | |
| dest_script: PathScript = PathScript(dest) | |
| else: | |
| src_script: PathScript = PathScript(Path(str(src).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1))) | |
| dest_script: PathScript = PathScript(Path(str(dest).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1))) | |
| if not src_script.exists(): | |
| log.error(f"--- Source path does not exist") | |
| log.debug(f"------ Path as QBIT sees: {src} --> {dest}") | |
| log.debug(f"------ Path as SCRIPT sees: {src_script} --> {dest_script}") | |
| skip = True | |
| continue | |
| log.info(f"--- {src} --> {dest}") | |
| if skip: | |
| continue | |
| total_linked += 1 | |
| if DRY_RUN: | |
| continue | |
| already_exists: Set[PathScript] = set() | |
| try: | |
| for src, dest in full_paths_map.items(): | |
| if mounts_are_equal: | |
| src_script: PathScript = PathScript(src) | |
| dest_script: PathScript = PathScript(dest) | |
| else: | |
| src_script: PathScript = PathScript(Path(str(src).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1))) | |
| dest_script: PathScript = PathScript(Path(str(dest).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1))) | |
| if dest_script.exists(): | |
| log.warning(f"--- Destination path already exists") | |
| log.debug(f"------ Path as QBIT sees: {src} --> {dest}") | |
| log.debug(f"------ Path as SCRIPT sees: {src_script} --> {dest_script}") | |
| already_exists.add(dest_script) | |
| continue | |
| dest_script.parent.mkdir(parents=True, exist_ok=True) | |
| if LINK_TYPE == "hardlink": | |
| os.link(src_script, dest_script) | |
| else: | |
| os.symlink(src_script, dest_script) | |
| except Exception as e: | |
| log.exception(e) | |
| for dest in full_paths_map.values(): | |
| if mounts_are_equal: | |
| dest_script: PathScript = PathScript(dest) | |
| else: | |
| dest_script: PathScript = PathScript(Path(str(dest).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1))) | |
| if dest_script in already_exists: | |
| continue | |
| if dest_script.exists(): | |
| log.info(f"------ Removing created link: {dest_script}") | |
| dest_script.unlink(missing_ok=True) | |
| continue | |
| new_save_path_b: bytes = str(new_save_path).encode() | |
| fastresume[b'save_path'] = new_save_path_b | |
| fastresume[b'qBt-savePath'] = new_save_path_b # Disables autoTMM https://github.com/qbittorrent/qBittorrent/blob/eba5cbb8039d4766daaa51ed4e16f867af3505ac/src/base/bittorrent/bencoderesumedatastorage.cpp#L238 | |
| fastresume[b'qBt-downloadPath'] = new_save_path_b | |
| fastresume[b'qBt-contentLayout'] = ORIGINAL_LAYOUT | |
| if DUPLICATE_CATEGORIES: | |
| category: str = fastresume.get(b'qBt-category', b"").decode() # type: ignore | |
| if category and category != LINK_CATEGORY and not category.endswith(TORRENT_CATEGORY_SUFFIX): # type: ignore | |
| cat_tag: bytes = f"{category}{TORRENT_CATEGORY_SUFFIX}".encode() | |
| if not cat_tag in qbt_tags: | |
| qbt_tags.append(cat_tag) # type: ignore | |
| fastresume[b'qBt-category'] = LINK_CATEGORY.encode() | |
| fastresume.pop(b'mapped_files', None) # This is used if the files have been renamed, but we are using ORIGINAL_LAYOUT https://libtorrent.org/manual-ref.html#fast-resume # type: ignore | |
| with open(fastresume_file, "wb") as f: | |
| f.write(bencodepy.encode(fastresume)) # type: ignore | |
| log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Total cross-seed torrents: {total_cross_seeds}") | |
| log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Total newly linked torrents: {total_linked}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment