Skip to content

Instantly share code, notes, and snippets.

@hrz6976
Created July 21, 2023 14:25
Show Gist options
  • Save hrz6976/a37664ce76c62d6cf6454926ffac365e to your computer and use it in GitHub Desktop.
Save hrz6976/a37664ce76c62d6cf6454926ffac365e to your computer and use it in GitHub Desktop.
Move unnested bangumi files into named folders
from dataclasses import dataclass
from functools import lru_cache
from json import JSONDecodeError
import logging
import re
import os
import shutil
from urllib.parse import quote
import requests
@dataclass
class Episode:
title_en: str | None
title_zh: str | None
title_jp: str | None
season: int
season_raw: str
episode: int
sub: str
group: str
resolution: str
source: str
logger = logging.getLogger(__name__)
EPISODE_RE = re.compile(r"\d+")
TITLE_RE = re.compile(
r"(.*|\[.*])( -? \d+|\[\d+]|\[\d+.?[vV]\d]|第\d+[话話集]|\[第?\d+[话話集]]|\[\d+.?END]|[Ee][Pp]?\d+)(.*)"
)
RESOLUTION_RE = re.compile(r"1080|720|2160|4K")
SOURCE_RE = re.compile(r"B-Global|[Bb]aha|[Bb]ilibili|AT-X|Web")
SUB_RE = re.compile(r"[简繁日字幕]|CH|BIG5|GB")
PREFIX_RE = re.compile(r"[^\w\s\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff-]")
CHINESE_NUMBER_MAP = {
"一": 1,
"二": 2,
"三": 3,
"四": 4,
"五": 5,
"六": 6,
"七": 7,
"八": 8,
"九": 9,
"十": 10,
}
def get_group(name: str) -> str:
return re.split(r"[\[\]]", name)[1]
def pre_process(raw_name: str) -> str:
return raw_name.replace("【", "[").replace("】", "]")
def prefix_process(raw: str, group: str) -> str:
raw = re.sub(f".{group}.", "", raw)
raw_process = PREFIX_RE.sub("/", raw)
arg_group = raw_process.split("/")
while "" in arg_group:
arg_group.remove("")
if len(arg_group) == 1:
arg_group = arg_group[0].split(" ")
for arg in arg_group:
if re.search(r"新番|月?番", arg) and len(arg) <= 5:
raw = re.sub(f".{arg}.", "", raw)
elif re.search(r"港澳台地区", arg):
raw = re.sub(f".{arg}.", "", raw)
return raw
def season_process(season_info: str):
name_season = season_info
# if re.search(r"新番|月?番", season_info):
# name_season = re.sub(".*新番.", "", season_info)
# # 去除「新番」信息
# name_season = re.sub(r"^[^]】]*[]】]", "", name_season).strip()
season_rule = r"S\d{1,2}|Season \d{1,2}|[第].[季期]"
name_season = re.sub(r"[\[\]]", " ", name_season)
seasons = re.findall(season_rule, name_season)
if not seasons:
return name_season, "", 1
name = re.sub(season_rule, "", name_season)
for season in seasons:
season_raw = season
if re.search(r"Season|S", season) is not None:
season = int(re.sub(r"Season|S", "", season))
break
elif re.search(r"[第 ].*[季期(部分)]|部分", season) is not None:
season_pro = re.sub(r"[第季期 ]", "", season)
try:
season = int(season_pro)
except ValueError:
season = CHINESE_NUMBER_MAP[season_pro]
break
return name, season_raw, season
def name_process(name: str):
name_en, name_zh, name_jp = None, None, None
name = name.strip()
name = re.sub(r"[((]仅限港澳台地区[))]", "", name)
split = re.split(r"/|\s{2}|-\s{2}", name)
while "" in split:
split.remove("")
if len(split) == 1:
if re.search("_{1}", name) is not None:
split = re.split("_", name)
elif re.search(" - {1}", name) is not None:
split = re.split("-", name)
if len(split) == 1:
split_space = split[0].split(" ")
language_pattern = []
for item in split_space:
if re.search(r"[\u4e00-\u9fa5]{2,}", item) is not None:
language_pattern.append(1)
elif re.search(r"[a-zA-Z]{2,}", item) is not None:
language_pattern.append(0)
elif re.search(r"[\u0800-\u4e00]{2,}", item) is not None:
language_pattern.append(2)
split = [split_space[0]]
for i in range(1, len(split_space)):
# 如果当前字符串的语言与上一个字符串的语言相同
if language_pattern[i] == language_pattern[i - 1]:
# 合并这两个字符串
split[-1] += " " + split_space[i]
else:
# 否则,将当前字符串添加到结果列表中
split.append(split_space[i])
for item in split:
if re.search(r"[\u0800-\u4e00]{2,}", item) and not name_jp:
name_jp = item.strip()
elif re.search(r"[\u4e00-\u9fa5]{2,}", item) and not name_zh:
name_zh = item.strip()
elif re.search(r"[a-zA-Z]{3,}", item) and not name_en:
name_en = item.strip()
if name_en not in name:
name_en = None
return name_en, name_zh, name_jp
def find_tags(other):
elements = re.sub(r"[\[\]()()]", " ", other).split(" ")
# find CHT
sub, resolution, source = None, None, None
for element in filter(lambda x: x != "", elements):
if SUB_RE.search(element):
sub = element
elif RESOLUTION_RE.search(element):
resolution = element
elif SOURCE_RE.search(element):
source = element
return clean_sub(sub), resolution, source
def clean_sub(sub: str | None) -> str | None:
if sub is None:
return sub
return re.sub(r"_MP4|_MKV", "", sub)
def process(raw_title: str):
raw_title = raw_title.strip()
content_title = pre_process(raw_title)
# 预处理标题
group = get_group(content_title)
# 翻译组的名字
match_obj = TITLE_RE.match(content_title)
# 处理标题
season_info, episode_info, other = list(
map(lambda x: x.strip(), match_obj.groups())
)
process_raw = prefix_process(season_info, group)
# 处理 前缀
raw_name, season_raw, season = season_process(process_raw)
# 处理 第n季
name_en, name_zh, name_jp = "", "", ""
try:
name_en, name_zh, name_jp = name_process(raw_name)
# 处理 名字
except ValueError:
pass
# 处理 集数
raw_episode = EPISODE_RE.search(episode_info)
episode = 0
if raw_episode is not None:
episode = int(raw_episode.group())
sub, dpi, source = find_tags(other) # 剩余信息处理
return (
name_en,
name_zh,
name_jp,
season,
season_raw,
episode,
sub,
dpi,
source,
group,
)
def raw_parser(raw: str) -> Episode | None:
ret = process(raw)
if ret is None:
logger.error(f"Parser cannot analyse {raw}")
return None
name_en, name_zh, name_jp, season, sr, episode, sub, dpi, source, group = ret
return Episode(
name_en, name_zh, name_jp, season, sr, episode, sub, group, dpi, source
)
s = requests.Session()
def _get_bangumi_name(title: str) -> str:
"""
get bangumi name from bangumi.tv
"""
global s
import time
time.sleep(3)
r = s.get(f"https://api.bgm.tv/search/subject/{quote(title)}?type=2",
headers={
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
"cache-control": "no-cache",
"pragma": "no-cache",
"sec-ch-ua": "\"Not.A/Brand\";v=\"8\", \"Chromium\";v=\"114\", \"Microsoft Edge\";v=\"114\"",
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": "\"Windows\"",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1"
},
timeout=10)
r.raise_for_status()
try:
data = r.json()
return data["list"][0]["name_cn"]
except JSONDecodeError as e:
logger.error(f"Cannot decode json for {r.url}")
with open("error.html", "w", encoding="iso-8859-1") as f:
f.write(r.text)
# hit time limit
raise e
except Exception as e:
logger.warning(f"Cannot find bangumi info for {title}")
raise e
@lru_cache
def get_bangumi_name(title: str) -> str:
try:
return _get_bangumi_name(title)
except:
for tok in "(僅限", " ", "-":
if tok in title:
title = title.split(tok)[0]
try:
get_bangumi_name(title)
except:
pass
raise ValueError(f"Cannot find bangumi info for {title}")
def get_full_path(path: str):
if not os.path.exists:
raise FileNotFoundError(f"Cannot find {path}")
return os.path.abspath(os.path.expanduser(path))
def move_file(old_path, new_path):
logger.info(f"{old_path} >> {new_path}")
_new_dirname = os.path.dirname(new_path)
if not os.path.exists(_new_dirname):
os.makedirs(_new_dirname)
shutil.move(old_path, new_path)
def rename_episode(episode_path: str):
fullpath = get_full_path(episode_path)
dirname, basename = os.path.split(fullpath)
# split filename and suffix
fname, suffix = os.path.splitext(basename)
episode = raw_parser(fname)
_title = episode.title_zh or episode.title_en or episode.title_jp or episode.group
bgm_name = get_bangumi_name(_title)
_new_path = os.path.join(dirname, bgm_name, basename)
move_file(fullpath, _new_path)
if __name__ == "__main__":
import sys
import argparse
parser = argparse.ArgumentParser()
# default argument
parser.add_argument("path", help="path to file")
parser.add_argument("--recursive", action="store_true", help="recursive", default=False)
args = parser.parse_args()
print(args)
logging.basicConfig(level=logging.INFO)
if args.recursive:
for root, dirs, files in os.walk(args.path):
for file in files:
# skip hidden
if file.startswith("."):
continue
try:
rename_episode(os.path.join(root, file))
except Exception as e:
logger.error(f"File: {file} Error: {e}")
else:
for _f in os.listdir(args.path):
file = os.path.join(args.path, _f)
if os.path.isfile(file):
# skip hidden
if _f.startswith("."):
continue
try:
rename_episode(file)
except Exception as e:
logger.error(f"File: {file} Error: {e}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment