Created
July 21, 2023 14:25
-
-
Save hrz6976/a37664ce76c62d6cf6454926ffac365e to your computer and use it in GitHub Desktop.
Move unnested bangumi files into named folders
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from functools import lru_cache | |
from json import JSONDecodeError | |
import logging | |
import re | |
import os | |
import shutil | |
from urllib.parse import quote | |
import requests | |
@dataclass | |
class Episode: | |
title_en: str | None | |
title_zh: str | None | |
title_jp: str | None | |
season: int | |
season_raw: str | |
episode: int | |
sub: str | |
group: str | |
resolution: str | |
source: str | |
logger = logging.getLogger(__name__) | |
EPISODE_RE = re.compile(r"\d+") | |
TITLE_RE = re.compile( | |
r"(.*|\[.*])( -? \d+|\[\d+]|\[\d+.?[vV]\d]|第\d+[话話集]|\[第?\d+[话話集]]|\[\d+.?END]|[Ee][Pp]?\d+)(.*)" | |
) | |
RESOLUTION_RE = re.compile(r"1080|720|2160|4K") | |
SOURCE_RE = re.compile(r"B-Global|[Bb]aha|[Bb]ilibili|AT-X|Web") | |
SUB_RE = re.compile(r"[简繁日字幕]|CH|BIG5|GB") | |
PREFIX_RE = re.compile(r"[^\w\s\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff-]") | |
CHINESE_NUMBER_MAP = { | |
"一": 1, | |
"二": 2, | |
"三": 3, | |
"四": 4, | |
"五": 5, | |
"六": 6, | |
"七": 7, | |
"八": 8, | |
"九": 9, | |
"十": 10, | |
} | |
def get_group(name: str) -> str: | |
return re.split(r"[\[\]]", name)[1] | |
def pre_process(raw_name: str) -> str: | |
return raw_name.replace("【", "[").replace("】", "]") | |
def prefix_process(raw: str, group: str) -> str: | |
raw = re.sub(f".{group}.", "", raw) | |
raw_process = PREFIX_RE.sub("/", raw) | |
arg_group = raw_process.split("/") | |
while "" in arg_group: | |
arg_group.remove("") | |
if len(arg_group) == 1: | |
arg_group = arg_group[0].split(" ") | |
for arg in arg_group: | |
if re.search(r"新番|月?番", arg) and len(arg) <= 5: | |
raw = re.sub(f".{arg}.", "", raw) | |
elif re.search(r"港澳台地区", arg): | |
raw = re.sub(f".{arg}.", "", raw) | |
return raw | |
def season_process(season_info: str): | |
name_season = season_info | |
# if re.search(r"新番|月?番", season_info): | |
# name_season = re.sub(".*新番.", "", season_info) | |
# # 去除「新番」信息 | |
# name_season = re.sub(r"^[^]】]*[]】]", "", name_season).strip() | |
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]" | |
name_season = re.sub(r"[\[\]]", " ", name_season) | |
seasons = re.findall(season_rule, name_season) | |
if not seasons: | |
return name_season, "", 1 | |
name = re.sub(season_rule, "", name_season) | |
for season in seasons: | |
season_raw = season | |
if re.search(r"Season|S", season) is not None: | |
season = int(re.sub(r"Season|S", "", season)) | |
break | |
elif re.search(r"[第 ].*[季期(部分)]|部分", season) is not None: | |
season_pro = re.sub(r"[第季期 ]", "", season) | |
try: | |
season = int(season_pro) | |
except ValueError: | |
season = CHINESE_NUMBER_MAP[season_pro] | |
break | |
return name, season_raw, season | |
def name_process(name: str): | |
name_en, name_zh, name_jp = None, None, None | |
name = name.strip() | |
name = re.sub(r"[((]仅限港澳台地区[))]", "", name) | |
split = re.split(r"/|\s{2}|-\s{2}", name) | |
while "" in split: | |
split.remove("") | |
if len(split) == 1: | |
if re.search("_{1}", name) is not None: | |
split = re.split("_", name) | |
elif re.search(" - {1}", name) is not None: | |
split = re.split("-", name) | |
if len(split) == 1: | |
split_space = split[0].split(" ") | |
language_pattern = [] | |
for item in split_space: | |
if re.search(r"[\u4e00-\u9fa5]{2,}", item) is not None: | |
language_pattern.append(1) | |
elif re.search(r"[a-zA-Z]{2,}", item) is not None: | |
language_pattern.append(0) | |
elif re.search(r"[\u0800-\u4e00]{2,}", item) is not None: | |
language_pattern.append(2) | |
split = [split_space[0]] | |
for i in range(1, len(split_space)): | |
# 如果当前字符串的语言与上一个字符串的语言相同 | |
if language_pattern[i] == language_pattern[i - 1]: | |
# 合并这两个字符串 | |
split[-1] += " " + split_space[i] | |
else: | |
# 否则,将当前字符串添加到结果列表中 | |
split.append(split_space[i]) | |
for item in split: | |
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp: | |
name_jp = item.strip() | |
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh: | |
name_zh = item.strip() | |
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en: | |
name_en = item.strip() | |
if name_en not in name: | |
name_en = None | |
return name_en, name_zh, name_jp | |
def find_tags(other): | |
elements = re.sub(r"[\[\]()()]", " ", other).split(" ") | |
# find CHT | |
sub, resolution, source = None, None, None | |
for element in filter(lambda x: x != "", elements): | |
if SUB_RE.search(element): | |
sub = element | |
elif RESOLUTION_RE.search(element): | |
resolution = element | |
elif SOURCE_RE.search(element): | |
source = element | |
return clean_sub(sub), resolution, source | |
def clean_sub(sub: str | None) -> str | None: | |
if sub is None: | |
return sub | |
return re.sub(r"_MP4|_MKV", "", sub) | |
def process(raw_title: str): | |
raw_title = raw_title.strip() | |
content_title = pre_process(raw_title) | |
# 预处理标题 | |
group = get_group(content_title) | |
# 翻译组的名字 | |
match_obj = TITLE_RE.match(content_title) | |
# 处理标题 | |
season_info, episode_info, other = list( | |
map(lambda x: x.strip(), match_obj.groups()) | |
) | |
process_raw = prefix_process(season_info, group) | |
# 处理 前缀 | |
raw_name, season_raw, season = season_process(process_raw) | |
# 处理 第n季 | |
name_en, name_zh, name_jp = "", "", "" | |
try: | |
name_en, name_zh, name_jp = name_process(raw_name) | |
# 处理 名字 | |
except ValueError: | |
pass | |
# 处理 集数 | |
raw_episode = EPISODE_RE.search(episode_info) | |
episode = 0 | |
if raw_episode is not None: | |
episode = int(raw_episode.group()) | |
sub, dpi, source = find_tags(other) # 剩余信息处理 | |
return ( | |
name_en, | |
name_zh, | |
name_jp, | |
season, | |
season_raw, | |
episode, | |
sub, | |
dpi, | |
source, | |
group, | |
) | |
def raw_parser(raw: str) -> Episode | None: | |
ret = process(raw) | |
if ret is None: | |
logger.error(f"Parser cannot analyse {raw}") | |
return None | |
name_en, name_zh, name_jp, season, sr, episode, sub, dpi, source, group = ret | |
return Episode( | |
name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source | |
) | |
s = requests.Session() | |
def _get_bangumi_name(title: str) -> str: | |
""" | |
get bangumi name from bangumi.tv | |
""" | |
global s | |
import time | |
time.sleep(3) | |
r = s.get(f"https://api.bgm.tv/search/subject/{quote(title)}?type=2", | |
headers={ | |
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", | |
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", | |
"cache-control": "no-cache", | |
"pragma": "no-cache", | |
"sec-ch-ua": "\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Microsoft Edge\";v=\"114\"", | |
"sec-ch-ua-mobile": "?0", | |
"sec-ch-ua-platform": "\"Windows\"", | |
"sec-fetch-dest": "document", | |
"sec-fetch-mode": "navigate", | |
"sec-fetch-site": "none", | |
"sec-fetch-user": "?1", | |
"upgrade-insecure-requests": "1" | |
}, | |
timeout=10) | |
r.raise_for_status() | |
try: | |
data = r.json() | |
return data["list"][0]["name_cn"] | |
except JSONDecodeError as e: | |
logger.error(f"Cannot decode json for {r.url}") | |
with open("error.html", "w", encoding="iso-8859-1") as f: | |
f.write(r.text) | |
# hit time limit | |
raise e | |
except Exception as e: | |
logger.warning(f"Cannot find bangumi info for {title}") | |
raise e | |
@lru_cache | |
def get_bangumi_name(title: str) -> str: | |
try: | |
return _get_bangumi_name(title) | |
except: | |
for tok in "(僅限", " ", "-": | |
if tok in title: | |
title = title.split(tok)[0] | |
try: | |
get_bangumi_name(title) | |
except: | |
pass | |
raise ValueError(f"Cannot find bangumi info for {title}") | |
def get_full_path(path: str): | |
if not os.path.exists: | |
raise FileNotFoundError(f"Cannot find {path}") | |
return os.path.abspath(os.path.expanduser(path)) | |
def move_file(old_path, new_path): | |
logger.info(f"{old_path} >> {new_path}") | |
_new_dirname = os.path.dirname(new_path) | |
if not os.path.exists(_new_dirname): | |
os.makedirs(_new_dirname) | |
shutil.move(old_path, new_path) | |
def rename_episode(episode_path: str): | |
fullpath = get_full_path(episode_path) | |
dirname, basename = os.path.split(fullpath) | |
# split filename and suffix | |
fname, suffix = os.path.splitext(basename) | |
episode = raw_parser(fname) | |
_title = episode.title_zh or episode.title_en or episode.title_jp or episode.group | |
bgm_name = get_bangumi_name(_title) | |
_new_path = os.path.join(dirname, bgm_name, basename) | |
move_file(fullpath, _new_path) | |
if __name__ == "__main__": | |
import sys | |
import argparse | |
parser = argparse.ArgumentParser() | |
# default argument | |
parser.add_argument("path", help="path to file") | |
parser.add_argument("--recursive", action="store_true", help="recursive", default=False) | |
args = parser.parse_args() | |
print(args) | |
logging.basicConfig(level=logging.INFO) | |
if args.recursive: | |
for root, dirs, files in os.walk(args.path): | |
for file in files: | |
# skip hidden | |
if file.startswith("."): | |
continue | |
try: | |
rename_episode(os.path.join(root, file)) | |
except Exception as e: | |
logger.error(f"File: {file} Error: {e}") | |
else: | |
for _f in os.listdir(args.path): | |
file = os.path.join(args.path, _f) | |
if os.path.isfile(file): | |
# skip hidden | |
if _f.startswith("."): | |
continue | |
try: | |
rename_episode(file) | |
except Exception as e: | |
logger.error(f"File: {file} Error: {e}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment