Created
November 1, 2023 11:31
-
-
Save noaione/fc658289881920744448478186bca915 to your computer and use it in GitHub Desktop.
LANraragi / Anchira.to ingestion system | Ingest torrent folder into your LANraragi content directory
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Achira.to rip of FAKKU/Irodori/2D/etc. | |
import sys | |
import traceback | |
from pathlib import Path | |
from typing import TypedDict | |
from zipfile import ZipFile | |
from msgspec import Struct, ValidationError, field, yaml | |
from lrr_common import LRRIngestConfig, read_config | |
class AnchiraInfo(Struct): | |
url: str = field(name="URL") | |
source: str = field(name="Source") | |
title: str = field(name="Title") | |
released: int = field(name="Released") | |
pages: int = field(name="Pages") | |
thumbnail: int = field(name="Thumbnail") | |
circles: list[str] = field(name="Circle", default_factory=list) | |
parodies: list[str] = field(name="Parody", default_factory=list) | |
magazines: list[str] = field(name="Magazine", default_factory=list) | |
tags: list[str] = field(name="Tags", default_factory=list) | |
class GroupingPaths(TypedDict): | |
chapters: list[Path] | |
color: list[Path] | |
illustrations: list[Path] | |
non_h: list[Path] | |
western: list[Path] | |
doujins: list[Path] | |
def test_zip(zip_path: Path) -> bool: | |
try: | |
x = ZipFile(zip_path) | |
except Exception: | |
return False | |
else: | |
x.close() | |
return True | |
def info_grabber(zip_path: Path) -> AnchiraInfo | None: | |
if not test_zip(zip_path): | |
print(f"{zip_path.name} is not a valid zip file.") | |
return None | |
with ZipFile(zip_path) as zip_file: | |
for file in zip_file.namelist(): | |
if file.endswith(".yaml"): | |
try: | |
return yaml.decode(zip_file.read(file), type=AnchiraInfo) | |
except ValidationError as e: | |
print(f"Error decoding {zip_path.name}: {e}") | |
traceback.print_exc() | |
return None | |
def has_mag(info: AnchiraInfo) -> bool: | |
return len(info.magazines) > 0 | |
def move_files(groupings: GroupingPaths, target_folder: Path) -> None: | |
print(f"Moving files to target folder: {target_folder}") | |
group: list[Path] | |
for group_name, group in groupings.items(): | |
print(f" Moving '{group_name}': {len(group)}") | |
for file in group: | |
target_path = target_folder / group_name | |
if group_name == "doujins": | |
target_path = target_path.parent | |
elif group_name == "non_h": | |
target_path = target_path / "non-h" | |
target_path.mkdir(parents=True, exist_ok=True) | |
file.rename(target_path / file.name) | |
def archive_grabber(source_folder: Path) -> list[Path]: | |
cbz_grab = source_folder.glob("*.cbz") | |
zip_grab = source_folder.glob("*.zip") | |
return list(cbz_grab) + list(zip_grab) | |
def insensitive_in(item: str, collection: list[str]) -> bool: | |
collection = [x.casefold() for x in collection] | |
return item.casefold() in collection | |
def should_skip(info: AnchiraInfo, config: LRRIngestConfig) -> bool: | |
ignore_tags = config.ignored_tags | |
ignore_circles = config.ignored_circles | |
ignore_parodies = config.ignored_parodies | |
ignore_magazines = config.ignored_magazines | |
for tag in info.tags: | |
if insensitive_in(tag, ignore_tags): | |
return True | |
for circle in info.circles: | |
if insensitive_in(circle, ignore_circles): | |
return True | |
for parody in info.parodies: | |
if insensitive_in(parody, ignore_parodies): | |
return True | |
for magazine in info.magazines: | |
if insensitive_in(magazine, ignore_magazines): | |
return True | |
return False | |
def main(source_folder: Path, target_folder: Path) -> None: | |
config = read_config() | |
groupings: GroupingPaths = { | |
"chapters": [], | |
"color": [], | |
"illustrations": [], | |
"non_h": [], | |
"western": [], | |
"doujins": [], # everything else | |
} | |
print(f"Searching {source_folder} for zip/cbz files...") | |
for cbz_file in archive_grabber(source_folder): | |
info = info_grabber(cbz_file) | |
if info is None: | |
print(f" Could not find info for {cbz_file.name}") | |
continue | |
print(f" Found info for {cbz_file.name}") | |
if should_skip(info, config): | |
print(f" Skipping {cbz_file.name}") | |
continue | |
# Group by some source | |
if insensitive_in("Illustration", info.tags): | |
groupings["illustrations"].append(cbz_file) | |
continue | |
if insensitive_in("Non-H", info.tags): | |
if has_mag(info): | |
groupings["non_h"].append(cbz_file) | |
else: | |
groupings["doujins"].append(cbz_file) | |
continue | |
if insensitive_in("Western", info.tags): | |
groupings["western"].append(cbz_file) | |
continue | |
if insensitive_in("Color", info.tags): | |
if has_mag(info): | |
groupings["color"].append(cbz_file) | |
else: | |
groupings["doujins"].append(cbz_file) | |
continue | |
if has_mag(info): | |
groupings["chapters"].append(cbz_file) | |
continue | |
# If we get here, we don't know what to do with it | |
groupings["doujins"].append(cbz_file) | |
print("Groupings:") | |
print(f" Chapters: {len(groupings['chapters'])}") | |
print(f" Color: {len(groupings['color'])}") | |
print(f" Illustrations: {len(groupings['illustrations'])}") | |
print(f" Non-H: {len(groupings['non_h'])}") | |
print(f" Western: {len(groupings['western'])}") | |
print(f" Others: {len(groupings['doujins'])}") | |
move_files(groupings, target_folder) | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description="Anchira.to rip of FAKKU/Irodori/2D/etc.") | |
parser.add_argument("folder", type=Path, help="Folder to search for zip/cbz files") | |
parser.add_argument("-o", "--target", type=Path, help="Target folder to move files to", required=True) | |
args = parser.parse_args() | |
folder: Path = args.folder | |
target: Path = args.target | |
if not folder.exists(): | |
print("Folder does not exist.") | |
sys.exit(1) | |
if not folder.is_dir(): | |
print("Folder is not a directory.") | |
sys.exit(1) | |
main(folder, target) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Common helper | |
from pathlib import Path | |
from msgspec import Struct, field, yaml | |
__all__ = ( | |
"save_config", | |
"read_config", | |
"LRRIngestConfig", | |
) | |
CURRENT_DIR = Path(__file__).parent.absolute() | |
class LRRIngestConfig(Struct): | |
ignored_tags: list[str] = field(name="ignored-tags", default_factory=list) | |
ignored_circles: list[str] = field(name="ignored-circles", default_factory=list) | |
ignored_parodies: list[str] = field(name="ignored-parodies", default_factory=list) | |
ignored_magazines: list[str] = field(name="ignored-magazines", default_factory=list) | |
def save_config(config: LRRIngestConfig) -> None: | |
config_yaml = CURRENT_DIR / "config.yaml" | |
config_yaml.write_bytes(yaml.encode(config)) | |
def read_config() -> LRRIngestConfig: | |
config_yaml = CURRENT_DIR / "config.yaml" | |
if not config_yaml.exists(): | |
config_yaml = CURRENT_DIR / "config.yml" | |
if not config_yaml.exists(): | |
config = LRRIngestConfig() | |
save_config(config) | |
return config | |
# move config.yml to config.yaml | |
config_yaml.rename(CURRENT_DIR / "config.yaml") | |
return yaml.decode(config_yaml.read_bytes(), type=LRRIngestConfig) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment