-
-
Save wzyboy/49c25333a550903d180563fdea8f0c90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import re | |
import sys | |
import json | |
import hashlib | |
from pathlib import Path | |
from datetime import datetime | |
import pymysql | |
from pymysql.cursors import DictCursor | |
from collections.abc import Iterator | |
# You may also need to update post_id_to_url() | |
DOMAIN = 'wzyboy.im' | |
class Converter: | |
def __init__(self, db_name: str) -> None: | |
self.db_name = db_name | |
self.connection = pymysql.connect( | |
read_default_file='~/.my.cnf', | |
db=db_name, | |
cursorclass=DictCursor, | |
) | |
self.comment_id_map = {} | |
self.id_prefix = 'wp_comment_' | |
def iter_row(self) -> Iterator[dict]: | |
with self.connection.cursor() as c: | |
sql = 'SELECT * from wp_comments WHERE comment_type = "" ORDER BY comment_ID' | |
c.execute(sql) | |
for row in c.fetchall(): | |
self.comment_id_map[row['comment_ID']] = row | |
yield row | |
@staticmethod | |
def md5(s: str): | |
if s == '': | |
return '' | |
hash_obj = hashlib.md5() | |
hash_obj.update(s.encode()) | |
return hash_obj.hexdigest().lower() | |
def post_id_to_url(self, post_id: str) -> str: | |
return f'/post/{post_id}.html' | |
def post_id_to_href(self, post_id) -> str: | |
url = self.post_id_to_url(post_id) | |
href = f'https://{DOMAIN}{url}' | |
return href | |
def get_root_id(self, comment: dict): | |
# If a comment has no parent, root id is the id of itself. | |
if comment['comment_parent'] == 0: | |
return f"{self.id_prefix}{comment['comment_ID']}" | |
# Otherwise, keep looking up until the ancestor is found. | |
parent = self.comment_id_map[comment['comment_parent']] | |
return self.get_root_id(parent) | |
def format_content(self, content: str) -> str: | |
# remove mentioning hyperlink | |
# <a href="#comment-36" rel="nofollow">@ym</a> -> @ym | |
content = re.sub( | |
r'<a href="#comment-\d+".*?>(@.+)</a>', | |
r'\1', | |
content | |
) | |
# wrap with p | |
content = f'<p>{content}</p>' | |
return content | |
@staticmethod | |
def format_ts(ts: datetime) -> int: | |
return int(ts.timestamp() * 1000) | |
def convert_row(self, row: dict) -> dict: | |
converted = { | |
"_id": f'{self.id_prefix}{row["comment_ID"]}', | |
"nick": row['comment_author'], | |
"mail": row['comment_author_email'], | |
"mailMd5": self.md5(row['comment_author_email']), | |
"link": row['comment_author_url'], | |
"ua": row['comment_agent'], | |
"ip": row['comment_author_IP'], | |
"master": True if row['user_id'] == 1 else False, | |
"url": self.post_id_to_url(row['comment_post_ID']), | |
"href": self.post_id_to_href(row['comment_post_ID']), | |
"comment": self.format_content(row['comment_content']), | |
"pid": f'{self.id_prefix}{row["comment_parent"]}' if row['comment_parent'] != 0 else None, | |
"rid": None if row['comment_parent'] == 0 else self.get_root_id(row), | |
"isSpam": False, | |
"created": self.format_ts(row['comment_date_gmt']), | |
"updated": self.format_ts(row['comment_date_gmt']), | |
} | |
return converted | |
def convert(self) -> None: | |
converted = [] | |
for row in self.iter_row(): | |
converted.append(self.convert_row(row)) | |
json_path = Path(f'{self.db_name}_wp_comments.json') | |
json_path.write_text(json.dumps(converted, indent=2, ensure_ascii=False)) | |
if __name__ == '__main__': | |
db_name = sys.argv[1] | |
c = Converter(db_name) | |
c.convert() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment