Skip to content

Instantly share code, notes, and snippets.

@wzyboy
Last active May 23, 2024 08:58
Show Gist options
  • Save wzyboy/49c25333a550903d180563fdea8f0c90 to your computer and use it in GitHub Desktop.
Save wzyboy/49c25333a550903d180563fdea8f0c90 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import re
import sys
import json
import hashlib
from pathlib import Path
from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor
from collections.abc import Iterator
# You may also need to update post_id_to_url()
DOMAIN = 'wzyboy.im'
class Converter:
def __init__(self, db_name: str) -> None:
self.db_name = db_name
self.connection = pymysql.connect(
read_default_file='~/.my.cnf',
db=db_name,
cursorclass=DictCursor,
)
self.comment_id_map = {}
self.id_prefix = 'wp_comment_'
def iter_row(self) -> Iterator[dict]:
with self.connection.cursor() as c:
sql = 'SELECT * from wp_comments WHERE comment_type = "" ORDER BY comment_ID'
c.execute(sql)
for row in c.fetchall():
self.comment_id_map[row['comment_ID']] = row
yield row
@staticmethod
def md5(s: str):
if s == '':
return ''
hash_obj = hashlib.md5()
hash_obj.update(s.encode())
return hash_obj.hexdigest().lower()
def post_id_to_url(self, post_id: str) -> str:
return f'/post/{post_id}.html'
def post_id_to_href(self, post_id) -> str:
url = self.post_id_to_url(post_id)
href = f'https://{DOMAIN}{url}'
return href
def get_root_id(self, comment: dict):
# If a comment has no parent, root id is the id of itself.
if comment['comment_parent'] == 0:
return f"{self.id_prefix}{comment['comment_ID']}"
# Otherwise, keep looking up until the ancestor is found.
parent = self.comment_id_map[comment['comment_parent']]
return self.get_root_id(parent)
def format_content(self, content: str) -> str:
# remove mentioning hyperlink
# <a href="#comment-36" rel="nofollow">@ym</a> -> @ym
content = re.sub(
r'<a href="#comment-\d+".*?>(@.+)</a>',
r'\1',
content
)
# wrap with p
content = f'<p>{content}</p>'
return content
@staticmethod
def format_ts(ts: datetime) -> int:
return int(ts.timestamp() * 1000)
def convert_row(self, row: dict) -> dict:
converted = {
"_id": f'{self.id_prefix}{row["comment_ID"]}',
"nick": row['comment_author'],
"mail": row['comment_author_email'],
"mailMd5": self.md5(row['comment_author_email']),
"link": row['comment_author_url'],
"ua": row['comment_agent'],
"ip": row['comment_author_IP'],
"master": True if row['user_id'] == 1 else False,
"url": self.post_id_to_url(row['comment_post_ID']),
"href": self.post_id_to_href(row['comment_post_ID']),
"comment": self.format_content(row['comment_content']),
"pid": f'{self.id_prefix}{row["comment_parent"]}' if row['comment_parent'] != 0 else None,
"rid": None if row['comment_parent'] == 0 else self.get_root_id(row),
"isSpam": False,
"created": self.format_ts(row['comment_date_gmt']),
"updated": self.format_ts(row['comment_date_gmt']),
}
return converted
def convert(self) -> None:
converted = []
for row in self.iter_row():
converted.append(self.convert_row(row))
json_path = Path(f'{self.db_name}_wp_comments.json')
json_path.write_text(json.dumps(converted, indent=2, ensure_ascii=False))
if __name__ == '__main__':
db_name = sys.argv[1]
c = Converter(db_name)
c.convert()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment