Created
March 22, 2025 12:15
PTT文章爬蟲
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from bs4 import BeautifulSoup | |
from datetime import datetime, timedelta | |
import logging | |
from pathlib import Path | |
import json | |
import time | |
import re | |
import yaml | |
import sqlite3 | |
import sys | |
from dataclasses import dataclass | |
from functools import wraps | |
from typing import List, Tuple, Optional, Dict, TypeVar | |
T = TypeVar('T') | |
# 常數設定 | |
CONSTANTS = { | |
'BASE_URL': 'https://www.ptt.cc', | |
'HEADERS': { | |
'cookie': 'over18=1', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
}, | |
'TIMEOUT': (5, 15), | |
'RETRY_ATTEMPTS': 5, | |
'BACKUP_INTERVAL': timedelta(hours=1), | |
'POSTS_PER_BATCH': 500, | |
'REQUEST_DELAY': 1.0, # 增加延遲以避免過多請求 | |
} | |
@dataclass | |
class Post: | |
date: datetime.date | |
title: str | |
push_count: int | |
article_id: str | |
def retry_on_exception(retries: int = 3, delay: float = 1.0): | |
def decorator(func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
for i in range(retries): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
if i == retries - 1: | |
raise | |
logging.warning(f"{func.__name__} 失敗: {e}, 重試中... ({i+1}/{retries})") | |
time.sleep(delay * (i + 1)) | |
return None | |
return wrapper | |
return decorator | |
class PTTCrawler: | |
def __init__(self, board_name: str, db_name: str, config_path: Optional[str] = None, end_date: Optional[datetime.date] = None): | |
self.config = load_config(config_path) | |
self.board_name: str = board_name | |
self.db_path: Path = Path(db_name) | |
self.base_url: str = self.config['BASE_URL'] | |
self.headers: Dict[str, str] = self.config['HEADERS'] | |
self.end_date: datetime.date = end_date or datetime(2020, 1, 1).date() | |
self.posts_per_batch: int = self.config['POSTS_PER_BATCH'] | |
self.backup_interval: timedelta = self.config['BACKUP_INTERVAL'] | |
self.last_backup_time: datetime = datetime.now() | |
self.all_posts: List[Post] = [] | |
self.session = self._create_session() | |
self._setup_logging() | |
self._setup_database() | |
self.last_article_id, self.last_article_date = self._get_earliest_article_info() | |
self.current_url: str = self._get_start_url() | |
logging.info(f"初始化完成,起始 URL: {self.current_url}") | |
if self.last_article_id and self.last_article_date: | |
logging.info(f"資料庫中最舊文章: ID={self.last_article_id}, Date={self.last_article_date}") | |
else: | |
logging.info("資料庫中無現有文章,從最新頁面開始抓取") | |
def _create_session(self) -> requests.Session: | |
session = requests.Session() | |
adapter = requests.adapters.HTTPAdapter(max_retries=self.config['RETRY_ATTEMPTS']) | |
session.mount("http://", adapter) | |
session.mount("https://", adapter) | |
return session | |
def _setup_logging(self) -> None: | |
log_dir = Path("logs") | |
log_dir.mkdir(exist_ok=True) | |
log_file = log_dir / f"ptt_crawler_{self.board_name}.log" | |
handler = logging.FileHandler(log_file, encoding='utf-8') | |
formatter = logging.Formatter("%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s") | |
handler.setFormatter(formatter) | |
logging.basicConfig(level=logging.INFO, handlers=[handler, logging.StreamHandler(sys.stdout)]) | |
def _setup_database(self) -> None: | |
self.conn = sqlite3.connect(self.db_path) | |
self.cursor = self.conn.cursor() | |
self.cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS posts ( | |
article_id TEXT PRIMARY KEY, | |
date TEXT, | |
title TEXT, | |
push_count INTEGER | |
) | |
''') | |
self.conn.commit() | |
logging.info(f"資料庫設置完成: {self.db_path}") | |
def _get_earliest_article_info(self) -> Tuple[Optional[str], Optional[datetime.date]]: | |
try: | |
self.cursor.execute("SELECT article_id, date FROM posts ORDER BY date ASC LIMIT 1") | |
result = self.cursor.fetchone() | |
if result: | |
article_id, date_str = result | |
try: | |
article_date = datetime.strptime(date_str, '%Y-%m-%d').date() | |
logging.info(f"從資料庫讀取最舊文章: ID={article_id}, Date={article_date}") | |
return article_id, article_date | |
except ValueError as e: | |
logging.error(f"資料庫日期格式錯誤: {date_str}, {e}") | |
return None, None | |
else: | |
logging.info("資料庫中無現有文章") | |
return None, None | |
except sqlite3.Error as e: | |
logging.error(f"資料庫查詢失敗: {e}") | |
return None, None | |
def _get_start_url(self) -> str: | |
return f"{self.base_url}/bbs/{self.board_name}/index.html" | |
def _extract_article_id(self, url: str) -> Optional[str]: | |
try: | |
match = re.search(r'M\.(\d+)\.', url) | |
return match.group(1) if match else None | |
except Exception as e: | |
logging.warning(f"無法從 URL 提取文章編號: {url}, 錯誤: {e}") | |
return None | |
@retry_on_exception(retries=3, delay=1.0) | |
def _fetch_page(self, url: str) -> Optional[BeautifulSoup]: | |
try: | |
response = self.session.get(url, headers=self.headers, timeout=self.config['TIMEOUT']) | |
response.raise_for_status() | |
logging.info(f"成功抓取頁面: {url}") | |
return BeautifulSoup(response.text, "lxml") | |
except Exception as e: | |
logging.error(f"抓取頁面失敗: {url}, 錯誤: {e}") | |
return None | |
def _parse_article_content(self, article_url: str, article_id: str, push_count: int) -> Optional[Post]: | |
"""從文章內頁抓取標題和日期""" | |
soup = self._fetch_page(article_url) | |
if not soup: | |
return None | |
try: | |
# 提取日期(位於 meta 標籤的 .article-meta-value 中) | |
meta_tags = soup.select(".article-meta-value") | |
if len(meta_tags) < 4: | |
logging.warning(f"文章 {article_id} 缺少完整 meta 資訊") | |
return None | |
date_str = meta_tags[3].text.strip() # 日期通常在第 4 個 meta 標籤 | |
article_date = datetime.strptime(date_str, "%a %b %d %H:%M:%S %Y").date() | |
# 提取標題(位於 #main-content 的第一行或 title 標籤) | |
title_tag = soup.select_one("#main-content b") | |
title = title_tag.text.strip() if title_tag else soup.title.text.split(" - ")[0].strip() | |
if article_date < self.end_date: | |
logging.info(f"文章日期 {article_date} 小於結束日期 {self.end_date},跳過") | |
return None | |
logging.debug(f"從內頁解析: ID={article_id}, Date={article_date}, Title={title}") | |
return Post(article_date, title, push_count, article_id) | |
except Exception as e: | |
logging.warning(f"解析文章內容失敗 {article_url}: {e}") | |
return None | |
def _process_push_count(self, push_str: str) -> int: | |
push_map = { | |
'爆': 100, | |
'X': -10, | |
'━': 0 | |
} | |
if not push_str: | |
return 0 | |
return push_map.get(push_str, int(push_str) if push_str.isdigit() else 0) | |
def _should_process_article(self, article_id: str, article_date: datetime.date) -> bool: | |
if not self.last_article_id or not self.last_article_date: | |
logging.info(f"無歷史資料,處理文章: ID={article_id}, Date={article_date}") | |
return True | |
try: | |
if article_date > self.last_article_date: | |
logging.info(f"文章 {article_id} ({article_date}) 在最新資料之後,處理") | |
return True | |
elif article_date == self.last_article_date and int(article_id) > int(self.last_article_id): | |
logging.info(f"文章 {article_id} 同一天但ID較大,處理") | |
return True | |
else: | |
logging.info(f"文章 {article_id} ({article_date}) 在資料庫中已有,跳過") | |
return False | |
except ValueError as e: | |
logging.warning(f"無法比較文章編號: {article_id}, 錯誤: {e}") | |
return False | |
def _process_page(self, soup: BeautifulSoup) -> Tuple[List[Post], bool]: | |
posts: List[Post] = [] | |
reached_end_date = False | |
try: | |
entries = soup.select(".r-ent") | |
if not entries: | |
logging.warning("未找到任何文章項目 (.r-ent)") | |
return posts, True | |
logging.info(f"找到 {len(entries)} 個文章項目") | |
# 按頁面順序處理(從舊到新) | |
for entry in reversed(entries): | |
title_tag = entry.select_one(".title a") | |
push_tag = entry.select_one(".nrec") | |
if not title_tag: | |
logging.warning("文章缺少標題標籤") | |
continue | |
article_url = self.base_url + title_tag.get('href', '') | |
article_id = self._extract_article_id(article_url) | |
if not article_id: | |
logging.warning(f"無法提取文章ID: {article_url}") | |
continue | |
push_count = self._process_push_count(push_tag.text.strip() if push_tag else "") | |
post = self._parse_article_content(article_url, article_id, push_count) | |
if not post: | |
continue | |
if not self._should_process_article(article_id, post.date): | |
continue | |
posts.append(post) | |
logging.info(f"新增文章: ID={article_id}, Title={post.title}, Date={post.date}") | |
# 反轉回從新到舊儲存 | |
posts.reverse() | |
if posts and posts[-1].date < self.end_date: | |
reached_end_date = True | |
except Exception as e: | |
logging.error(f"處理頁面失敗: {e}") | |
return posts, reached_end_date | |
def _get_next_page_url(self, soup: BeautifulSoup) -> Optional[str]: | |
try: | |
links = soup.select('.btn-group-paging a') | |
for link in links: | |
if "上頁" in link.text and link.has_attr("href"): | |
next_url = f"{self.base_url}{link['href']}" | |
logging.info(f"找到下一頁 URL: {next_url}") | |
return next_url | |
logging.info("未找到下一頁 URL") | |
return None | |
except Exception as e: | |
logging.error(f"獲取下一頁 URL 失敗: {e}") | |
return None | |
def _should_backup(self) -> bool: | |
return datetime.now() - self.last_backup_time >= self.backup_interval | |
def _save_to_database(self, posts: List[Post]) -> None: | |
try: | |
self.cursor.executemany(''' | |
INSERT OR IGNORE INTO posts (article_id, date, title, push_count) | |
VALUES (?, ?, ?, ?) | |
''', [(post.article_id, post.date.strftime("%Y-%m-%d"), post.title, post.push_count) for post in posts]) | |
self.conn.commit() | |
logging.info(f"已儲存 {len(posts)} 筆資料到資料庫") | |
self.last_backup_time = datetime.now() | |
except Exception as e: | |
logging.error(f"資料庫儲存失敗: {e}") | |
self._create_backup(posts) | |
def _create_backup(self, posts: List[Post]) -> None: | |
backup_dir = Path("backup") | |
backup_dir.mkdir(exist_ok=True) | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
backup_path = backup_dir / f"{self.board_name}_{timestamp}.json" | |
try: | |
with open(backup_path, 'w', encoding='utf-8') as f: | |
json.dump([post.__dict__ for post in posts], f, ensure_ascii=False, indent=2) | |
logging.info(f"已建立備份檔案: {backup_path}") | |
except Exception as e: | |
logging.error(f"建立備份檔案失敗: {e}") | |
def crawl(self) -> None: | |
logging.info(f"開始爬取 {self.board_name} 板資料,結束日期設定為 {self.end_date}") | |
current_url = self.current_url | |
page_count = 0 | |
while current_url: | |
soup = self._fetch_page(current_url) | |
if not soup: | |
logging.error(f"無法抓取頁面: {current_url},爬取中止") | |
break | |
posts, reached_end_date = self._process_page(soup) | |
if posts: | |
self.all_posts.extend(posts) | |
logging.info(f"當前頁面找到 {len(posts)} 篇文章,總計 {len(self.all_posts)} 篇") | |
if len(self.all_posts) >= self.posts_per_batch or self._should_backup(): | |
self._save_to_database(self.all_posts) | |
self.all_posts = [] | |
page_count += 1 | |
if reached_end_date: | |
logging.info("達到結束日期,停止爬取") | |
break | |
next_url = self._get_next_page_url(soup) | |
if not next_url: | |
logging.info("沒有下一頁,爬取結束") | |
break | |
current_url = next_url | |
time.sleep(self.config['REQUEST_DELAY']) | |
if self.all_posts: | |
self._save_to_database(self.all_posts) | |
logging.info(f"爬取完成,共處理 {page_count} 頁") | |
def load_config(config_path: Optional[str] = None) -> Dict: | |
default_config = { | |
'board_name': 'Stock', | |
'db_name': 'ptt_stock_data.db', | |
'end_date': '2020-01-01' | |
} | |
if config_path: | |
try: | |
with open(config_path, 'r', encoding='utf-8') as f: | |
config = yaml.safe_load(f) or {} | |
return {**CONSTANTS, **default_config, **config} | |
except Exception as e: | |
logging.warning(f"無法載入設定檔 {config_path}: {e}, 使用預設值") | |
return {**CONSTANTS, **default_config} | |
if __name__ == "__main__": | |
try: | |
config_path = 'config.yaml' | |
config = load_config(config_path) | |
from datetime import date | |
if isinstance(config['end_date'], str): | |
end_date = datetime.strptime(config['end_date'], '%Y-%m-%d').date() | |
elif isinstance(config['end_date'], date): | |
end_date = config['end_date'] | |
else: | |
raise ValueError(f"無效的 end_date 格式: {config['end_date']}") | |
crawler = PTTCrawler( | |
board_name=config['board_name'], | |
db_name=config['db_name'], | |
config_path=config_path, | |
end_date=end_date | |
) | |
crawler.crawl() | |
except Exception as e: | |
logging.error(f"程式執行失敗: {e}") | |
raise |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment