Skip to content

Instantly share code, notes, and snippets.

@ehackerevan
Created March 22, 2025 12:15
PTT文章爬蟲
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timedelta
import logging
from pathlib import Path
import json
import time
import re
import yaml
import sqlite3
import sys
from dataclasses import dataclass
from functools import wraps
from typing import List, Tuple, Optional, Dict, TypeVar
T = TypeVar('T')
# 常數設定
CONSTANTS = {
'BASE_URL': 'https://www.ptt.cc',
'HEADERS': {
'cookie': 'over18=1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
},
'TIMEOUT': (5, 15),
'RETRY_ATTEMPTS': 5,
'BACKUP_INTERVAL': timedelta(hours=1),
'POSTS_PER_BATCH': 500,
'REQUEST_DELAY': 1.0, # 增加延遲以避免過多請求
}
@dataclass
class Post:
date: datetime.date
title: str
push_count: int
article_id: str
def retry_on_exception(retries: int = 3, delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for i in range(retries):
try:
return func(*args, **kwargs)
except Exception as e:
if i == retries - 1:
raise
logging.warning(f"{func.__name__} 失敗: {e}, 重試中... ({i+1}/{retries})")
time.sleep(delay * (i + 1))
return None
return wrapper
return decorator
class PTTCrawler:
def __init__(self, board_name: str, db_name: str, config_path: Optional[str] = None, end_date: Optional[datetime.date] = None):
self.config = load_config(config_path)
self.board_name: str = board_name
self.db_path: Path = Path(db_name)
self.base_url: str = self.config['BASE_URL']
self.headers: Dict[str, str] = self.config['HEADERS']
self.end_date: datetime.date = end_date or datetime(2020, 1, 1).date()
self.posts_per_batch: int = self.config['POSTS_PER_BATCH']
self.backup_interval: timedelta = self.config['BACKUP_INTERVAL']
self.last_backup_time: datetime = datetime.now()
self.all_posts: List[Post] = []
self.session = self._create_session()
self._setup_logging()
self._setup_database()
self.last_article_id, self.last_article_date = self._get_earliest_article_info()
self.current_url: str = self._get_start_url()
logging.info(f"初始化完成,起始 URL: {self.current_url}")
if self.last_article_id and self.last_article_date:
logging.info(f"資料庫中最舊文章: ID={self.last_article_id}, Date={self.last_article_date}")
else:
logging.info("資料庫中無現有文章,從最新頁面開始抓取")
def _create_session(self) -> requests.Session:
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(max_retries=self.config['RETRY_ATTEMPTS'])
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _setup_logging(self) -> None:
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / f"ptt_crawler_{self.board_name}.log"
handler = logging.FileHandler(log_file, encoding='utf-8')
formatter = logging.Formatter("%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s")
handler.setFormatter(formatter)
logging.basicConfig(level=logging.INFO, handlers=[handler, logging.StreamHandler(sys.stdout)])
def _setup_database(self) -> None:
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS posts (
article_id TEXT PRIMARY KEY,
date TEXT,
title TEXT,
push_count INTEGER
)
''')
self.conn.commit()
logging.info(f"資料庫設置完成: {self.db_path}")
def _get_earliest_article_info(self) -> Tuple[Optional[str], Optional[datetime.date]]:
try:
self.cursor.execute("SELECT article_id, date FROM posts ORDER BY date ASC LIMIT 1")
result = self.cursor.fetchone()
if result:
article_id, date_str = result
try:
article_date = datetime.strptime(date_str, '%Y-%m-%d').date()
logging.info(f"從資料庫讀取最舊文章: ID={article_id}, Date={article_date}")
return article_id, article_date
except ValueError as e:
logging.error(f"資料庫日期格式錯誤: {date_str}, {e}")
return None, None
else:
logging.info("資料庫中無現有文章")
return None, None
except sqlite3.Error as e:
logging.error(f"資料庫查詢失敗: {e}")
return None, None
def _get_start_url(self) -> str:
return f"{self.base_url}/bbs/{self.board_name}/index.html"
def _extract_article_id(self, url: str) -> Optional[str]:
try:
match = re.search(r'M\.(\d+)\.', url)
return match.group(1) if match else None
except Exception as e:
logging.warning(f"無法從 URL 提取文章編號: {url}, 錯誤: {e}")
return None
@retry_on_exception(retries=3, delay=1.0)
def _fetch_page(self, url: str) -> Optional[BeautifulSoup]:
try:
response = self.session.get(url, headers=self.headers, timeout=self.config['TIMEOUT'])
response.raise_for_status()
logging.info(f"成功抓取頁面: {url}")
return BeautifulSoup(response.text, "lxml")
except Exception as e:
logging.error(f"抓取頁面失敗: {url}, 錯誤: {e}")
return None
def _parse_article_content(self, article_url: str, article_id: str, push_count: int) -> Optional[Post]:
"""從文章內頁抓取標題和日期"""
soup = self._fetch_page(article_url)
if not soup:
return None
try:
# 提取日期(位於 meta 標籤的 .article-meta-value 中)
meta_tags = soup.select(".article-meta-value")
if len(meta_tags) < 4:
logging.warning(f"文章 {article_id} 缺少完整 meta 資訊")
return None
date_str = meta_tags[3].text.strip() # 日期通常在第 4 個 meta 標籤
article_date = datetime.strptime(date_str, "%a %b %d %H:%M:%S %Y").date()
# 提取標題(位於 #main-content 的第一行或 title 標籤)
title_tag = soup.select_one("#main-content b")
title = title_tag.text.strip() if title_tag else soup.title.text.split(" - ")[0].strip()
if article_date < self.end_date:
logging.info(f"文章日期 {article_date} 小於結束日期 {self.end_date},跳過")
return None
logging.debug(f"從內頁解析: ID={article_id}, Date={article_date}, Title={title}")
return Post(article_date, title, push_count, article_id)
except Exception as e:
logging.warning(f"解析文章內容失敗 {article_url}: {e}")
return None
def _process_push_count(self, push_str: str) -> int:
push_map = {
'爆': 100,
'X': -10,
'━': 0
}
if not push_str:
return 0
return push_map.get(push_str, int(push_str) if push_str.isdigit() else 0)
def _should_process_article(self, article_id: str, article_date: datetime.date) -> bool:
if not self.last_article_id or not self.last_article_date:
logging.info(f"無歷史資料,處理文章: ID={article_id}, Date={article_date}")
return True
try:
if article_date > self.last_article_date:
logging.info(f"文章 {article_id} ({article_date}) 在最新資料之後,處理")
return True
elif article_date == self.last_article_date and int(article_id) > int(self.last_article_id):
logging.info(f"文章 {article_id} 同一天但ID較大,處理")
return True
else:
logging.info(f"文章 {article_id} ({article_date}) 在資料庫中已有,跳過")
return False
except ValueError as e:
logging.warning(f"無法比較文章編號: {article_id}, 錯誤: {e}")
return False
def _process_page(self, soup: BeautifulSoup) -> Tuple[List[Post], bool]:
posts: List[Post] = []
reached_end_date = False
try:
entries = soup.select(".r-ent")
if not entries:
logging.warning("未找到任何文章項目 (.r-ent)")
return posts, True
logging.info(f"找到 {len(entries)} 個文章項目")
# 按頁面順序處理(從舊到新)
for entry in reversed(entries):
title_tag = entry.select_one(".title a")
push_tag = entry.select_one(".nrec")
if not title_tag:
logging.warning("文章缺少標題標籤")
continue
article_url = self.base_url + title_tag.get('href', '')
article_id = self._extract_article_id(article_url)
if not article_id:
logging.warning(f"無法提取文章ID: {article_url}")
continue
push_count = self._process_push_count(push_tag.text.strip() if push_tag else "")
post = self._parse_article_content(article_url, article_id, push_count)
if not post:
continue
if not self._should_process_article(article_id, post.date):
continue
posts.append(post)
logging.info(f"新增文章: ID={article_id}, Title={post.title}, Date={post.date}")
# 反轉回從新到舊儲存
posts.reverse()
if posts and posts[-1].date < self.end_date:
reached_end_date = True
except Exception as e:
logging.error(f"處理頁面失敗: {e}")
return posts, reached_end_date
def _get_next_page_url(self, soup: BeautifulSoup) -> Optional[str]:
try:
links = soup.select('.btn-group-paging a')
for link in links:
if "上頁" in link.text and link.has_attr("href"):
next_url = f"{self.base_url}{link['href']}"
logging.info(f"找到下一頁 URL: {next_url}")
return next_url
logging.info("未找到下一頁 URL")
return None
except Exception as e:
logging.error(f"獲取下一頁 URL 失敗: {e}")
return None
def _should_backup(self) -> bool:
return datetime.now() - self.last_backup_time >= self.backup_interval
def _save_to_database(self, posts: List[Post]) -> None:
try:
self.cursor.executemany('''
INSERT OR IGNORE INTO posts (article_id, date, title, push_count)
VALUES (?, ?, ?, ?)
''', [(post.article_id, post.date.strftime("%Y-%m-%d"), post.title, post.push_count) for post in posts])
self.conn.commit()
logging.info(f"已儲存 {len(posts)} 筆資料到資料庫")
self.last_backup_time = datetime.now()
except Exception as e:
logging.error(f"資料庫儲存失敗: {e}")
self._create_backup(posts)
def _create_backup(self, posts: List[Post]) -> None:
backup_dir = Path("backup")
backup_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = backup_dir / f"{self.board_name}_{timestamp}.json"
try:
with open(backup_path, 'w', encoding='utf-8') as f:
json.dump([post.__dict__ for post in posts], f, ensure_ascii=False, indent=2)
logging.info(f"已建立備份檔案: {backup_path}")
except Exception as e:
logging.error(f"建立備份檔案失敗: {e}")
def crawl(self) -> None:
logging.info(f"開始爬取 {self.board_name} 板資料,結束日期設定為 {self.end_date}")
current_url = self.current_url
page_count = 0
while current_url:
soup = self._fetch_page(current_url)
if not soup:
logging.error(f"無法抓取頁面: {current_url},爬取中止")
break
posts, reached_end_date = self._process_page(soup)
if posts:
self.all_posts.extend(posts)
logging.info(f"當前頁面找到 {len(posts)} 篇文章,總計 {len(self.all_posts)} 篇")
if len(self.all_posts) >= self.posts_per_batch or self._should_backup():
self._save_to_database(self.all_posts)
self.all_posts = []
page_count += 1
if reached_end_date:
logging.info("達到結束日期,停止爬取")
break
next_url = self._get_next_page_url(soup)
if not next_url:
logging.info("沒有下一頁,爬取結束")
break
current_url = next_url
time.sleep(self.config['REQUEST_DELAY'])
if self.all_posts:
self._save_to_database(self.all_posts)
logging.info(f"爬取完成,共處理 {page_count} 頁")
def load_config(config_path: Optional[str] = None) -> Dict:
default_config = {
'board_name': 'Stock',
'db_name': 'ptt_stock_data.db',
'end_date': '2020-01-01'
}
if config_path:
try:
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f) or {}
return {**CONSTANTS, **default_config, **config}
except Exception as e:
logging.warning(f"無法載入設定檔 {config_path}: {e}, 使用預設值")
return {**CONSTANTS, **default_config}
if __name__ == "__main__":
try:
config_path = 'config.yaml'
config = load_config(config_path)
from datetime import date
if isinstance(config['end_date'], str):
end_date = datetime.strptime(config['end_date'], '%Y-%m-%d').date()
elif isinstance(config['end_date'], date):
end_date = config['end_date']
else:
raise ValueError(f"無效的 end_date 格式: {config['end_date']}")
crawler = PTTCrawler(
board_name=config['board_name'],
db_name=config['db_name'],
config_path=config_path,
end_date=end_date
)
crawler.crawl()
except Exception as e:
logging.error(f"程式執行失敗: {e}")
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment