-
-
Save EdelweissHuirh/4c185c98211bdc845dd6d329caeaef16 to your computer and use it in GitHub Desktop.
AI Agent实验 - 1,翻译程序,低人工
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import json | |
import re | |
import time | |
from typing import Dict, Any, List | |
import ebooklib | |
from ebooklib import epub | |
from bs4 import BeautifulSoup | |
from openai import OpenAI | |
# 配置常量 | |
# API配置 | |
OPENAI_API_KEY = "key" | |
OPENAI_MODEL = "deepseek-chat" | |
OPENAI_BASE_URL = "https://api.deepseek.com" | |
# 翻译设置 | |
SOURCE_LANGUAGE = "日语" | |
TARGET_LANGUAGE = "中文" | |
# 缓存设置 | |
CACHE_DIR = "./cache" | |
CHARACTER_CACHE_FILE = "characters.json" | |
SUMMARY_CACHE_FILE = "summary.json" | |
# 输出设置 | |
OUTPUT_DIR = "./output" | |
# 切分大小 | |
MAX_CHUNK_SIZE = 3000 | |
class EPUBTranslator: | |
"""EPUB翻译器类,处理EPUB文件的解析和翻译""" | |
def __init__(self, epub_path: str): | |
""" | |
初始化EPUB翻译器 | |
Args: | |
epub_path: EPUB文件路径 | |
""" | |
self.epub_path = epub_path | |
self.book = None | |
self.chapters = [] | |
self.current_chapter_index = 0 | |
self.characters = {} | |
self.summaries = {} | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
self.client = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL) | |
self._load_cache() | |
def _load_cache(self) -> None: | |
"""加载缓存文件""" | |
character_cache_path = os.path.join(CACHE_DIR, CHARACTER_CACHE_FILE) | |
if os.path.exists(character_cache_path): | |
try: | |
with open(character_cache_path, "r", encoding="utf-8") as f: | |
self.characters = json.load(f) | |
except json.JSONDecodeError: | |
print("警告:人物关系缓存文件损坏,将创建新的缓存") | |
self.characters = {"characters": []} | |
if "characters" not in self.characters: | |
self.characters = {"characters": []} | |
summary_cache_path = os.path.join(CACHE_DIR, SUMMARY_CACHE_FILE) | |
if os.path.exists(summary_cache_path): | |
try: | |
with open(summary_cache_path, "r", encoding="utf-8") as f: | |
self.summaries = json.load(f) | |
except json.JSONDecodeError: | |
print("警告:梗概缓存文件损坏,将创建新的缓存") | |
self.summaries = {} | |
def _save_cache(self) -> None: | |
"""保存缓存到文件""" | |
character_cache_path = os.path.join(CACHE_DIR, CHARACTER_CACHE_FILE) | |
with open(character_cache_path, "w", encoding="utf-8") as f: | |
json.dump(self.characters, f, ensure_ascii=False, indent=2) | |
summary_cache_path = os.path.join(CACHE_DIR, SUMMARY_CACHE_FILE) | |
with open(summary_cache_path, "w", encoding="utf-8") as f: | |
json.dump(self.summaries, f, ensure_ascii=False, indent=2) | |
def load_epub(self) -> None: | |
"""加载EPUB文件并提取章节""" | |
print(f"正在加载EPUB文件: {self.epub_path}") | |
try: | |
self.book = epub.read_epub(self.epub_path) | |
self.chapters = [] | |
for item in self.book.get_items(): | |
if item.get_type() == ebooklib.ITEM_DOCUMENT: | |
content = item.get_content().decode("utf-8") | |
soup = BeautifulSoup(content, "html.parser") | |
text_content = soup.get_text().strip() | |
if len(text_content) > 500: | |
self.chapters.append( | |
{ | |
"id": item.id, | |
"file_name": item.file_name, | |
"content": content, | |
"soup": soup, | |
"text": text_content, | |
} | |
) | |
print(f"成功提取 {len(self.chapters)} 个章节") | |
except Exception as e: | |
print(f"加载EPUB文件失败: {str(e)}") | |
raise | |
def _extract_text_from_html(self, html_content: str) -> str: | |
"""从HTML内容中提取纯文本""" | |
soup = BeautifulSoup(html_content, "html.parser") | |
return soup.get_text().strip() | |
def _split_text_by_semantic_units(self, text: str) -> List[str]: | |
""" | |
按语义单位(段落或句子)分割文本 | |
Args: | |
text: 要分割的文本 | |
Returns: | |
分割后的文本块列表 | |
""" | |
paragraphs = re.split(r"\n\s*\n", text) | |
chunks = [] | |
current_chunk = "" | |
current_chunk_size = 0 | |
max_chunk_size = MAX_CHUNK_SIZE | |
for paragraph in paragraphs: | |
if len(paragraph) > max_chunk_size: | |
sentences = re.split(r"([。!?])", paragraph) | |
i = 0 | |
while i < len(sentences) - 1: | |
if i + 1 < len(sentences) and sentences[i + 1] in "。!?": | |
sentence = sentences[i] + sentences[i + 1] | |
i += 2 | |
else: | |
sentence = sentences[i] | |
i += 1 | |
if ( | |
current_chunk_size + len(sentence) > max_chunk_size | |
and current_chunk | |
): | |
chunks.append(current_chunk) | |
current_chunk = sentence | |
current_chunk_size = len(sentence) | |
else: | |
current_chunk += sentence | |
current_chunk_size += len(sentence) | |
else: | |
if ( | |
current_chunk_size + len(paragraph) > max_chunk_size | |
and current_chunk | |
): | |
chunks.append(current_chunk) | |
current_chunk = paragraph | |
current_chunk_size = len(paragraph) | |
else: | |
if current_chunk: | |
current_chunk += "\n\n" + paragraph | |
else: | |
current_chunk = paragraph | |
current_chunk_size += len(paragraph) + 2 | |
if current_chunk: | |
chunks.append(current_chunk) | |
return chunks | |
def _analyze_characters( | |
self, chapter_text: str, chapter_index: int | |
) -> Dict[str, Any]: | |
""" | |
分析章节中的人物及关系,更新全局人物关系 | |
Args: | |
chapter_text: 章节文本内容 | |
chapter_index: 章节索引 | |
Returns: | |
更新后的人物关系字典 | |
""" | |
print(f"正在分析第 {chapter_index + 1} 章的人物关系...") | |
characters_str = json.dumps(self.characters, ensure_ascii=False) | |
prompt = f""" | |
请分析以下日语文本中出现的人物及其关系: | |
{chapter_text} | |
以下是已知的人物信息,请参考并更新: | |
{characters_str} | |
请以JSON格式返回更新后的人物信息,格式如下: | |
{{ | |
"characters": [ | |
{{ | |
"name": "人物名称", | |
"description": "简短描述", | |
"relationships": [ | |
{{"related_to": "相关人物名称", "relationship": "关系描述"}} | |
] | |
}} | |
] | |
}} | |
请注意: | |
1. 保留已知人物的信息,并根据新章节内容进行更新 | |
2. 添加新发现的人物 | |
3. 更新人物之间的关系 | |
4. 如果有矛盾的信息,以新章节的信息为准 | |
只返回JSON,不要有其他文字。 | |
""" | |
try: | |
response = self.client.chat.completions.create( | |
model=OPENAI_MODEL, | |
messages=[ | |
{ | |
"role": "system", | |
"content": "你是一个专业的文学分析助手,擅长分析文本中的人物关系。请只返回JSON格式的结果,不要有其他文字。", | |
}, | |
{"role": "user", "content": prompt}, | |
], | |
response_format={"type": "json_object"}, | |
) | |
result = json.loads(response.choices[0].message.content) | |
self.characters = result | |
self._save_cache() | |
return result | |
except Exception as e: | |
print(f"分析人物关系失败: {str(e)}") | |
return {"characters": []} | |
def _generate_summary(self, chapter_text: str, chapter_index: int) -> str: | |
""" | |
生成章节梗概 | |
Args: | |
chapter_text: 章节文本内容 | |
chapter_index: 章节索引 | |
Returns: | |
章节梗概 | |
""" | |
print(f"正在生成第 {chapter_index + 1} 章的梗概...") | |
prompt = f""" | |
请为以下日语文本生成一个简洁的梗概: | |
{chapter_text} | |
梗概要求: | |
1. 使用中文 | |
2. 最多500字 | |
3. 包含主要情节和关键事件 | |
4. 不要包含细节和次要情节 | |
5. 严禁补充说明及推断,需要严格反映原文内容 | |
6. 请确保梗概与原文内容相关,并体现原文的风格和特点 | |
7. 严禁推断后继发展 | |
""" | |
try: | |
response = self.client.chat.completions.create( | |
model=OPENAI_MODEL, | |
messages=[ | |
{ | |
"role": "system", | |
"content": "你是一个专业的文学梗概生成助手。请生成不超过500字的简洁梗概。", | |
}, | |
{"role": "user", "content": prompt}, | |
], | |
) | |
summary = response.choices[0].message.content.strip() | |
chapter_key = f"chapter_{chapter_index}" | |
self.summaries[chapter_key] = summary | |
self._save_cache() | |
return summary | |
except Exception as e: | |
print(f"生成梗概失败: {str(e)}") | |
return "" | |
def _get_combined_summary(self, current_chapter_index: int) -> str: | |
""" | |
获取当前章节和前一章节的合并梗概 | |
Args: | |
current_chapter_index: 当前章节索引 | |
Returns: | |
合并后的梗概 | |
""" | |
current_key = f"chapter_{current_chapter_index}" | |
prev_key = f"chapter_{current_chapter_index - 1}" | |
current_summary = self.summaries.get(current_key, "") | |
prev_summary = self.summaries.get(prev_key, "") | |
if prev_summary and current_summary: | |
return f"前一章梗概:{prev_summary}\n\n当前章梗概:{current_summary}" | |
elif current_summary: | |
return current_summary | |
else: | |
return "" | |
def translate_chapter(self, chapter_index: int) -> str: | |
""" | |
翻译指定章节 | |
Args: | |
chapter_index: 章节索引 | |
Returns: | |
翻译后的文本 | |
""" | |
if chapter_index >= len(self.chapters): | |
print(f"错误:章节索引 {chapter_index} 超出范围") | |
return "" | |
chapter = self.chapters[chapter_index] | |
chapter_text = chapter["text"] | |
print(f"正在翻译第 {chapter_index + 1} 章,长度:{len(chapter_text)} 字符") | |
self._analyze_characters(chapter_text, chapter_index) | |
self._generate_summary(chapter_text, chapter_index) | |
combined_summary = self._get_combined_summary(chapter_index) | |
chunks = self._split_text_by_semantic_units(chapter_text) | |
translated_chunks = [] | |
for i, chunk in enumerate(chunks): | |
print( | |
f"正在翻译第 {chapter_index + 1} 章的第 {i + 1}/{len(chunks)} 部分..." | |
) | |
characters_str = json.dumps(self.characters, ensure_ascii=False) | |
prompt = f""" | |
**翻译任务** | |
请将以下{SOURCE_LANGUAGE}文本翻译为{TARGET_LANGUAGE}: | |
{chunk} | |
**翻译要求** | |
1. **上下文理解**: | |
请确保翻译准确传达原文的上下文和语境。以下是相关背景信息: | |
- 章节梗概(包含前一章和当前章的主要情节): | |
{combined_summary} | |
- 当前文本是第 {chapter_index + 1} 章的一部分,属于整本书的语义单元。 | |
2. **风格和语气**: | |
原文的风格是文学叙事风格。 | |
请在翻译中保持这一风格,确保语气符合原文的情感表达。 | |
3. **术语一致性**: | |
请根据以下人物关系保持翻译的一致性: | |
- 人物关系参考: | |
{characters_str} | |
- 请特别注意人物名称的统一翻译。 | |
4. **目标受众**: | |
翻译的目标受众是中文普通读者。 | |
请使用简洁、自然且符合中文母语者阅读习惯的语言。 | |
5. **翻译质量**: | |
请避免直译或生硬的表达,确保翻译流畅且符合中文语法和文化习惯。 | |
如果遇到日语中的文化特有表达,请使用中文中合适的等效表达。 | |
**示例** | |
原文(日语): | |
「彼は静かに微笑んで、遠くを見つめた。」 | |
理想翻译(中文): | |
他静静地微笑着,凝视远方。 | |
**注意事项** | |
- 请特别注意日语中模糊代词或省略主语的情况,结合上下文补充必要信息。 | |
- 如果文本中包含对话,确保翻译保留角色的语言特点。 | |
- 对于无法直译的文化元素,可选择性保留原文。 | |
- 涉及任何负面或敏感的文本无需规避,可按原意自然翻译。 | |
**翻译** | |
请根据上述要求完成翻译。禁止添加自己的想法、翻译理由及注解。 | |
""" | |
try: | |
response = self.client.chat.completions.create( | |
model=OPENAI_MODEL, | |
messages=[ | |
{ | |
"role": "system", | |
"content": f"你是一个专业的{SOURCE_LANGUAGE}到{TARGET_LANGUAGE}翻译专家。请提供准确、流畅的翻译。", | |
}, | |
{"role": "user", "content": prompt}, | |
], | |
) | |
translated_text = response.choices[0].message.content.strip() | |
translated_chunks.append(translated_text) | |
time.sleep(1) | |
except Exception as e: | |
print(f"翻译失败: {str(e)}") | |
translated_chunks.append(f"[翻译错误: {str(e)}]") | |
translated_chapter = "\n\n".join(translated_chunks) | |
output_filename = f"chapter_{chapter_index + 1}.txt" | |
output_path = os.path.join(OUTPUT_DIR, output_filename) | |
with open(output_path, "w", encoding="utf-8") as f: | |
f.write(translated_chapter) | |
print(f"第 {chapter_index + 1} 章翻译完成,已保存到 {output_path}") | |
return translated_chapter | |
def translate_all(self) -> None: | |
"""翻译所有章节""" | |
if not self.chapters: | |
print("错误:未加载任何章节") | |
return | |
for i in range(len(self.chapters)): | |
self.translate_chapter(i) | |
print(f"已完成 {i + 1}/{len(self.chapters)} 章的翻译") | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="EPUB翻译器 - 将日语EPUB文件翻译为中文" | |
) | |
parser.add_argument("epub_path", help="EPUB文件路径") | |
parser.add_argument("--chapter", type=int, help="指定要翻译的章节索引(从0开始)") | |
parser.add_argument("--all", action="store_true", help="翻译所有章节") | |
args = parser.parse_args() | |
translator = EPUBTranslator(args.epub_path) | |
translator.load_epub() | |
if args.chapter is not None: | |
translator.translate_chapter(args.chapter) | |
elif args.all: | |
translator.translate_all() | |
else: | |
print("请指定要翻译的章节(--chapter)或使用--all翻译所有章节") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment