Skip to content

Instantly share code, notes, and snippets.

@ShanaryS
Last active October 21, 2025 23:26
Show Gist options
  • Select an option

  • Save ShanaryS/6fbc60327ad5f7043c81e5b1f33da404 to your computer and use it in GitHub Desktop.

Select an option

Save ShanaryS/6fbc60327ad5f7043c81e5b1f33da404 to your computer and use it in GitHub Desktop.
import logging
import os
import shutil
import sys
from pathlib import Path
from typing import Any, NewType, Union, Dict, List, Set
import bencodepy # type: ignore
### ---------- SETUP ---------- ###
# python 3.6+
# pip install bencodepy
### ---------- SETUP ---------- ###
### ---------- INSTRUCTIONS ---------- ###
### ---------- INSTRUCTIONS ---------- ###
### ---------- INSTRUCTIONS ---------- ###
# This script is entirely optional and not necessary for cross-seed v6.
# If you have an exotic setup, this script may not work for you.
# If DRY_RUN gives no errors, it should be able to run without issues.
# ETA ~30s per 1k moved torrents
# This script will convert all torrents with the cross-seed tag in client
# to link at LINK_DIR/TrakerName, as v6 `flatLinking: false` does.
# Torrent with save paths already in LINK_DIR as LINK_DIR/tracker will be ignored.
# WARNING: You will likely need to clean up orphaned files using qbit_manage as this script
# does not remove files, only creates links to the new location.
# Please verify you can do this before running this script.
# 1. Populate ONLY the options in the CONFIG section below according to your cross-seed setup
# 2. Ensure you will run the script on the same OS as the qBittorrent client
# 3. Ensure the paths that qBittorrent uses are accessible to this script, see SCRIPT_MOUNT and QBIT_MOUNT in CONFIG
# 4. STOP qBittorrent and make a manual backup of the BT_backup folder (this script will backup as well)
# 5. Run the script with DRY_RUN = True and check the console output for a few paths to ensure they are correct
# 6. Run the script with DRY_RUN = False to create the links
# 7. Start qBittorrent and check if any torrents are in MISSING_FILES state, check the category and paths of a few of the changed torrents, pause and recheck a couple for verification
# 8. If anything is wrong, STOP qBittorrent, replace the BT_backup folder with the backup, and start qBittorrent. Contact the cross-seed discord for support
# 9. If everything is correct, you will NEED to run qbit_manage to clean up the orphaned files. This script does not remove files, only creates links to the new location
### ---------- END INSTRUCTIONS ---------- ###
### ---------- END INSTRUCTIONS ---------- ###
### ---------- END INSTRUCTIONS ---------- ###
### ---------- CONFIG ----------
### ---------- CONFIG ----------
### ---------- CONFIG ----------
# Set to True to only log what would be done
DRY_RUN = True
# This must be the path from the prespective of this script
# It may differ from the path you have in cross-seed
# QUIT qBittorrent and BACKUP THIS FOLDER before running this script
TORRENT_DIR = ""
LINK_TYPE = "hardlink"
LINK_CATEGORY = "cross-seed-link"
DUPLICATE_CATEGORIES = False
# This must be the path from the prespective of this script
# It may differ from the path you have in cross-seed
# If the script says lots of files are not found, this may be the cause
LINK_DIR = ""
# If qbit is in docker with different volume paths than this script sees,
# set these to the volume mount for your qbit download folder.
# e.g if the volume is: /path/outside/docker:/path/qbit/sees
# then: SCRIPT_MOUNT = "/path/outside/docker" and QBIT_MOUNT = "/path/qbit/sees"
# If the mounts are the same, e.g the volume is: /path:/path or /a/b/c:/a/b/c
# then leave these as empty strings: SCRIPT_MOUNT = "" and QBIT_MOUNT = ""
# In any case, run this script with DRY_RUN = True, it will error if anything is wrong
# If the script says lots of files are not found, this may be the cause
SCRIPT_MOUNT = ""
QBIT_MOUNT = ""
# The key is the tracker url or announce id, the value is the folder name
# The url is for the tracker, NOT the website. View the tracker urls for a torrent in client to find the correct url
# You don't need the entire url, just a unique part of it
# Any torrents without one of these will be ignored
# Two items are prefiled with example tracker url and announce id
TRACKER_URL_NAME = {
"example.com": "Example",
"fewafjewoaufeiowajpofe": "TrackerName",
}
### ---------- END CONFIG ----------
### ---------- END CONFIG ----------
### ---------- END CONFIG ----------
# types
FastResume = NewType("FastResume", Dict[bytes, Any]) # OrderedDict[bytes, Any]
Torrent = NewType("Torrent", Dict[bytes, Any]) # OrderedDict[bytes, Any]
PathScript = NewType("PathScript", Path)
PathQbit = NewType("PathQbit", Path)
assert sys.version_info >= (3, 6), f'Python 3.6+ required, current version: {sys.version_info}'
assert isinstance(DRY_RUN, bool), 'DRY_RUN must be a boolean: DRY_RUN = True or DRY_RUN = False'
assert isinstance(TORRENT_DIR, str), 'TORRENT_DIR must be a string like: TORRENT_DIR = "/path/to/folder"'
TORRENT_DIR_PATH = PathScript(Path(TORRENT_DIR))
assert TORRENT_DIR_PATH.exists() and TORRENT_DIR_PATH.is_dir(), f'TORRENT_DIR does not exist or is not a folder: {TORRENT_DIR}'
assert LINK_TYPE in ["hardlink", "symlink"], 'LINK_TYPE must be either "hardlink" or "symlink": LINK_TYPE = "hardlink"'
assert isinstance(LINK_CATEGORY, str), 'LINK_CATEGORY must be a string like: LINK_CATEGORY = "xseed"'
assert isinstance(DUPLICATE_CATEGORIES, bool), 'DUPLICATE_CATEGORIES must be a boolean: DUPLICATE_CATEGORIES = True or DUPLICATE_CATEGORIES = False'
assert isinstance(LINK_DIR, str), 'LINK_DIR must be a string like: LINK_DIR = "/path/to/folder"'
LINK_DIR_PATH = PathScript(Path(LINK_DIR))
assert Path(LINK_DIR_PATH).exists() and Path(LINK_DIR_PATH).is_dir(), f'LINK_DIR does not exist or is not a folder: {LINK_DIR}'
assert isinstance(SCRIPT_MOUNT, str) and isinstance(QBIT_MOUNT, str), 'SCRIPT_MOUNT and QBIT_MOUNT must be strings, set to "" to disable'
assert SCRIPT_MOUNT.endswith("/") == QBIT_MOUNT.endswith("/"), 'SCRIPT_MOUNT and QBIT_MOUNT must both end with "/" or neither'
assert len(TRACKER_URL_NAME) > 0, 'TRACKER_URL_NAME must be a dictionary like: TRACKER_URL_NAME = {"https://tracker.url": "tracker_name"}'
# Logging
LOG_NAME = f"{Path(__file__).stem}.log"
LOG_FORMAT_DATE = "%Y-%m-%d %H:%M:%S"; LOG_FORMAT_STREAM = "%(asctime)s.%(msecs)03d %(levelname)s: %(message)s"; LOG_FORMAT_FILE = "%(asctime)s.%(msecs)03d %(levelname)s: %(message)s"
LOG_HANDLER_STREAM = logging.StreamHandler(sys.stdout)
LOG_HANDLER_FILE = logging.FileHandler(LOG_NAME)
LOG_HANDLER_STREAM.setFormatter(logging.Formatter(LOG_FORMAT_STREAM, LOG_FORMAT_DATE)); LOG_HANDLER_FILE.setFormatter(logging.Formatter(LOG_FORMAT_FILE, LOG_FORMAT_DATE))
log = logging.getLogger(LOG_NAME)
log.setLevel(logging.DEBUG); LOG_HANDLER_STREAM.setLevel(logging.DEBUG); LOG_HANDLER_STREAM.setLevel(logging.DEBUG)
log.addHandler(LOG_HANDLER_STREAM)
log.addHandler(LOG_HANDLER_FILE)
# Backup TORRENT_DIR
TORRENT_DIR_ORIG = TORRENT_DIR_PATH.with_name(TORRENT_DIR_PATH.name + "_orig")
assert not TORRENT_DIR_ORIG.exists(), f'TORRENT_DIR_ORIG already exists, delete it before running this script: {TORRENT_DIR_ORIG}'
log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Backing up TORRENT_DIR to TORRENT_DIR_ORIG: {TORRENT_DIR_ORIG}")
if not DRY_RUN:
shutil.copytree(TORRENT_DIR_PATH, TORRENT_DIR_ORIG)
assert TORRENT_DIR_ORIG.exists() and TORRENT_DIR_ORIG.is_dir(), f'Failed to copy TORRENT_DIR to TORRENT_DIR_ORIG: {TORRENT_DIR_ORIG}'
assert len(list(TORRENT_DIR_ORIG.glob("*"))) == len(list(TORRENT_DIR_PATH.glob("*"))), 'Failed to copy all files from TORRENT_DIR to TORRENT_DIR_ORIG'
CROSS_SEED_TAG = b"cross-seed"
TORRENT_CATEGORY_SUFFIX = ".cross-seed" # Not bytes
ORIGINAL_LAYOUT = b"Original"
SUBFOLDER_LAYOUT = b"Subfolder"
NO_SUBFOLDER_LAYOUT = b"NoSubfolder"
log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Processing cross-seed torrents in TORRENT_DIR: {TORRENT_DIR}")
total_cross_seeds: int = 0
total_linked: int = 0
for fastresume_file in TORRENT_DIR_PATH.glob("*.fastresume"):
fastresume: FastResume = bencodepy.decode_from_file(fastresume_file) # type: ignore
qbt_tags: List[bytes] = fastresume[b'qBt-tags'] # type: ignore
if CROSS_SEED_TAG not in qbt_tags:
continue
total_cross_seeds += 1
tracker_name: Union[str, None] = None
trackers: List[List[bytes]] = fastresume[b'trackers'] # type: ignore
for tracker_urls_b in trackers: # type: ignore
for tracker_url_b in tracker_urls_b: # type: ignore
tracker_url = tracker_url_b.decode().lower() # type: ignore
for url, name in TRACKER_URL_NAME.items():
if url in tracker_url:
tracker_name = name
break
if tracker_name:
break
if not tracker_name:
continue
old_save_path: PathQbit = PathQbit(Path(fastresume[b'qBt-savePath'].decode()) if fastresume.get(b'qBt-savePath') else Path(fastresume[b'save_path'].decode())) # type: ignore
old_save_path_script: PathScript = PathScript(old_save_path)
link_dir_qbit: PathQbit = PathQbit(LINK_DIR_PATH)
mounts_are_equal: bool = True
if old_save_path.parts[:2] != link_dir_qbit.parts[:2]:
mounts_are_equal = False
if not SCRIPT_MOUNT or not QBIT_MOUNT:
log.critical(f'FATAL ERROR: You must set SCRIPT_MOUNT and QBIT_MOUNT - Drive mismatch between save_path and LINK_DIR for {fastresume_file} (see instructions step #3): {old_save_path} --> {link_dir_qbit}') # type: ignore
exit(1)
qbit_root: str = str(Path(*old_save_path.parts[:2]))
new_link_root: str = str(Path(*Path(str(link_dir_qbit).replace(SCRIPT_MOUNT, QBIT_MOUNT, 1)).parts[:2]))
old_save_path_script = PathScript(Path(str(old_save_path).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1)))
link_dir_qbit = PathQbit(Path(str(link_dir_qbit).replace(SCRIPT_MOUNT, QBIT_MOUNT, 1)))
assert qbit_root == new_link_root, f'FATAL ERROR: SCRIPT_MOUNT = "{SCRIPT_MOUNT}" or QBIT_MOUNT = "{QBIT_MOUNT}" is invalid - Drive mismatch between save_path and LINK_DIR for {fastresume_file} (see instructions step #3): {old_save_path} --> {link_dir_qbit}' # type: ignore
new_save_path_script = LINK_DIR_PATH/tracker_name
if old_save_path_script.resolve() == new_save_path_script.resolve():
continue
new_save_path = PathQbit(link_dir_qbit/tracker_name)
torrent_hash: str = fastresume_file.stem # qbit uses v2 hash trunctated to 40 chars if available
torrent_file = TORRENT_DIR_PATH/f"{torrent_hash}.torrent"
if not torrent_file.exists():
log.error(f"Torrent file not found: {torrent_file}")
continue
torrent: Torrent = bencodepy.decode_from_file(torrent_file) # type: ignore
info_dict: Dict[bytes, Any] = torrent[b'info'] # type: ignore
info_name: bytes = info_dict[b'name'] # type: ignore
torrent_name: str = info_name.decode()
log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}{torrent_name} [{torrent_hash}]: {old_save_path} --> {new_save_path}")
info_files: Union[List[Dict[bytes, Any]], None] = info_dict.get(b'files') # type: ignore
qbt_content_layout: bytes = fastresume[b'qBt-contentLayout'] # type: ignore
# Name is "" if multi file torrent without root folder, Replace empty path segments with "_" by convention, qbit removes / from name
# Implementation source from: https://github.com/qbittorrent/qBittorrent/blob/eba5cbb8039d4766daaa51ed4e16f867af3505ac/src/base/bittorrent/sessionimpl.cpp#L2759
file_paths: List[Path] = []
subfolder: str
root_folder: Union[str, None] = None
if not info_files:
file_path = Path(info_name.decode().replace("/", "")) # qbit removes / from name # type: ignore
file_paths.append(file_path)
subfolder = file_path.stem
if len(file_path.parts) > 1:
root_folder = file_path.parts[0]
else:
for file_info in info_files: # type: ignore
file_paths.append(Path(info_name.decode().replace("/", ""))/Path(*[p.decode() or "_" for p in file_info[b'path']])) # type: ignore
subfolder = Path(*[p.decode() or "_" for p in info_files[0][b'path']]).stem # type: ignore
for file_info in info_files: # type: ignore
file_path = Path(info_name.decode().replace("/", ""))/Path(*[p.decode() or "_" for p in file_info[b'path']]) # type: ignore
if len(file_path.parts) <= 1:
root_folder = None
break
if root_folder is None:
root_folder = file_path.parts[0]
elif root_folder != file_path.parts[0]:
root_folder = None
break
full_paths_map: Dict[PathQbit, PathQbit] = {}
for file_path in file_paths:
if qbt_content_layout == ORIGINAL_LAYOUT:
full_paths_map[old_save_path/file_path] = new_save_path/file_path
elif qbt_content_layout == SUBFOLDER_LAYOUT:
if root_folder: full_paths_map[old_save_path/file_path] = new_save_path/file_path
else: full_paths_map[old_save_path/subfolder/file_path] = new_save_path/file_path
elif qbt_content_layout == NO_SUBFOLDER_LAYOUT:
if root_folder: full_paths_map[old_save_path/Path(*file_path.parts[1:])] = new_save_path/file_path
else: full_paths_map[old_save_path/file_path] = new_save_path/file_path
else:
log.error(f"Unknown content layout: {qbt_content_layout.decode()}") # type: ignore
raise ValueError(f"Unknown content layout: {qbt_content_layout.decode()}") # type: ignore
skip: bool = False
for src, dest in full_paths_map.items():
if mounts_are_equal:
src_script: PathScript = PathScript(src)
dest_script: PathScript = PathScript(dest)
else:
src_script: PathScript = PathScript(Path(str(src).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1)))
dest_script: PathScript = PathScript(Path(str(dest).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1)))
if not src_script.exists():
log.error(f"--- Source path does not exist")
log.debug(f"------ Path as QBIT sees: {src} --> {dest}")
log.debug(f"------ Path as SCRIPT sees: {src_script} --> {dest_script}")
skip = True
continue
log.info(f"--- {src} --> {dest}")
if skip:
continue
total_linked += 1
if DRY_RUN:
continue
already_exists: Set[PathScript] = set()
try:
for src, dest in full_paths_map.items():
if mounts_are_equal:
src_script: PathScript = PathScript(src)
dest_script: PathScript = PathScript(dest)
else:
src_script: PathScript = PathScript(Path(str(src).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1)))
dest_script: PathScript = PathScript(Path(str(dest).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1)))
if dest_script.exists():
log.warning(f"--- Destination path already exists")
log.debug(f"------ Path as QBIT sees: {src} --> {dest}")
log.debug(f"------ Path as SCRIPT sees: {src_script} --> {dest_script}")
already_exists.add(dest_script)
continue
dest_script.parent.mkdir(parents=True, exist_ok=True)
if LINK_TYPE == "hardlink":
os.link(src_script, dest_script)
else:
os.symlink(src_script, dest_script)
except Exception as e:
log.exception(e)
for dest in full_paths_map.values():
if mounts_are_equal:
dest_script: PathScript = PathScript(dest)
else:
dest_script: PathScript = PathScript(Path(str(dest).replace(QBIT_MOUNT, SCRIPT_MOUNT, 1)))
if dest_script in already_exists:
continue
if dest_script.exists():
log.info(f"------ Removing created link: {dest_script}")
dest_script.unlink(missing_ok=True)
continue
new_save_path_b: bytes = str(new_save_path).encode()
fastresume[b'save_path'] = new_save_path_b
fastresume[b'qBt-savePath'] = new_save_path_b # Disables autoTMM https://github.com/qbittorrent/qBittorrent/blob/eba5cbb8039d4766daaa51ed4e16f867af3505ac/src/base/bittorrent/bencoderesumedatastorage.cpp#L238
fastresume[b'qBt-downloadPath'] = new_save_path_b
fastresume[b'qBt-contentLayout'] = ORIGINAL_LAYOUT
if DUPLICATE_CATEGORIES:
category: str = fastresume.get(b'qBt-category', b"").decode() # type: ignore
if category and category != LINK_CATEGORY and not category.endswith(TORRENT_CATEGORY_SUFFIX): # type: ignore
cat_tag: bytes = f"{category}{TORRENT_CATEGORY_SUFFIX}".encode()
if not cat_tag in qbt_tags:
qbt_tags.append(cat_tag) # type: ignore
fastresume[b'qBt-category'] = LINK_CATEGORY.encode()
fastresume.pop(b'mapped_files', None) # This is used if the files have been renamed, but we are using ORIGINAL_LAYOUT https://libtorrent.org/manual-ref.html#fast-resume # type: ignore
with open(fastresume_file, "wb") as f:
f.write(bencodepy.encode(fastresume)) # type: ignore
log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Total cross-seed torrents: {total_cross_seeds}")
log.info(f"{'(DRY RUN) ' if DRY_RUN else ''}Total newly linked torrents: {total_linked}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment