Skip to content

Instantly share code, notes, and snippets.

@bay1
Last active February 15, 2024 03:58
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save bay1/4f8ad3c2d7a7b8089c0da43d6a9ab807 to your computer and use it in GitHub Desktop.
Save bay1/4f8ad3c2d7a7b8089c0da43d6a9ab807 to your computer and use it in GitHub Desktop.
# -*- coding:utf-8 -*-
#!/usr/bin/python3
import os
import shutil
import time
import pymysql
import requests
from HTMLParser import HTMLParser
from re import sub
from sys import stderr
from traceback import print_exc
from pymongo import MongoClient
# 787 等旧论坛id
# 13 等新数据库ID
theme_dict = {753: 13, 789: 14, 780: 15, 709: 16, 757: 17, 302: 18,
761: 19, 788: 20, 767: 21, 405: 22, 755: 23, 721: 25, 740: 10, 796: 27, 770: 26}
basedir = os.path.abspath(os.path.dirname(__file__))
user_url = url = 'https://bbs.com/api/v2/users'
topic_url = 'https://bbs.com/api/v2/topics'
conn = MongoClient('ip', port)
# 打开数据库连接
mysql_db_dz = pymysql.connect("ip", "user",
"password", "database", charset='utf8')
mongo_db = conn.nodebb # 连接nodebb数据库,没有则自动创建
my_set = mongo_db.objects # 使用objects集合,没有则自动创建
# 使用 cursor() 方法创建一个游标对象 cursor
dz_cursor = mysql_db_dz.cursor()
headers = {"Authorization": "Bearer nodebb-writeapi-token"} # 填写你的token
class _DeHTMLParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.__text = []
def handle_data(self, data):
text = data.strip()
if len(text) > 0:
text = sub('[ \t\r\n]+', ' ', text)
self.__text.append(text + ' ')
def handle_starttag(self, tag, attrs):
if tag == 'p':
self.__text.append('\n\n')
elif tag == 'br':
self.__text.append('\n')
def handle_startendtag(self, tag, attrs):
if tag == 'br':
self.__text.append('\n\n')
def text(self):
return ''.join(self.__text).strip()
def dehtml(text):
try:
parser = _DeHTMLParser()
parser.feed(text)
parser.close()
return parser.text()
except:
print_exc(file=stderr)
return text
def get_forum_post(fid, tid):
forum_post_sql = "select author,dateline,message,authorid from pre_forum_post where fid=%a and tid=%a" % (
fid, tid)
dz_cursor.execute(forum_post_sql)
forum_post = dz_cursor.fetchall()
return forum_post
def get_sightml(uid):
sightml_sql = "select sightml from pre_common_member_field_forum where uid=%a" % uid
dz_cursor.execute(sightml_sql)
sightml = dz_cursor.fetchone()
print(dehtml(sightml[0]))
return dehtml(sightml[0])
def get_birth_site(uid):
birth_site_sql = "select birthyear,birthmonth,birthday,site from pre_common_member_profile where uid=%a" % uid
dz_cursor.execute(birth_site_sql)
birth_site = dz_cursor.fetchone()
return birth_site
def get_lastvisit(uid):
lastvisit_sql = "select lastvisit from pre_common_member_status where uid=%a" % uid
dz_cursor.execute(lastvisit_sql)
lastvisit = dz_cursor.fetchone()
return lastvisit[0]
def get_avatar(uid):
uid = str(abs(int(uid)))
uid_length = len(uid)
if uid_length < 9:
uid = '0'*(9-uid_length)+uid
dir1 = uid[0:3]
dir2 = uid[3:5]
dir3 = uid[5:7]
dir4 = uid[7:9]
old_avatar = os.path.join(
basedir, 'avatar/'+dir1+'/'+dir2+'/'+dir3+'/'+dir4+'_avatar_middle.jpg')
return old_avatar
def mysql_query(sql):
# 执行SQL语句
dz_cursor.execute(sql)
# 获取所有记录列表
results = dz_cursor.fetchall()
return results
def update_user():
# SQL 查询所有用户语句
common_memeber_sql = "SELECT * FROM pre_common_member"
try:
# 执行SQL语句
dz_cursor.execute(common_memeber_sql)
# 获取所有记录列表
results = dz_cursor.fetchall()
except:
print("Error: unable to fetch data")
for row in results:
freeze = row[22]
if freeze == 1: # 判断用户是否被冻结
continue
uid = row[0]
email = row[1]
username = row[2]
avatarstatus = row[6]
regdate = row[12]
regdate = int(round(regdate*1000))
dz_credits = row[13]
sightml = get_sightml(uid)
birth_site = get_birth_site(uid)
lastvisit = get_lastvisit(uid)
lastvisit = int(round(lastvisit*1000))
find_data = list(my_set.find({'username': username,'email': email}))
if find_data:
user = find_data[0]
adduser_uid = user['uid']
update_data = {
"joindate": regdate,
"reputation": dz_credits,
"signature": sightml,
"website": birth_site[3],
"lastonline": lastvisit
}
if avatarstatus != 0:
old_avatar = get_avatar(uid)
new_avatar_address = '/assets/uploads/profile/%a-profileavatar.jpg' % adduser_uid
new_avatar = os.path.join(
basedir, 'profile/%a-profileavatar.jpg' % adduser_uid)
shutil.copy(old_avatar, new_avatar)
update_data["picture"] = new_avatar_address
update_data["uploadedpicture"] = new_avatar_address
if birth_site[0] != 0:
update_data["birthday"] = str(
birth_site[0])+'/'+str(birth_site[1])+'/'+str(birth_site[2])
mongo_result = my_set.update(
{"email": email}, {"$set": update_data})
print(mongo_result)
else:
print("没有此用户")
def create_user():
# SQL 查询所有用户语句
common_memeber_sql = "SELECT * FROM pre_common_member"
try:
# 执行SQL语句
dz_cursor.execute(common_memeber_sql)
# 获取所有记录列表
results = dz_cursor.fetchall()
except:
print("Error: unable to fetch data")
for i in range(0, 3):
row = results[i]
freeze = row[22]
if freeze == 1: # 判断用户是否被冻结
continue
uid = row[0]
email = row[1]
username = row[2]
avatarstatus = row[6]
regdate = row[12]
regdate = int(round(regdate*1000))
dz_credits = row[13]
sightml = get_sightml(uid)
birth_site = get_birth_site(uid)
lastvisit = get_lastvisit(uid)
lastvisit = int(round(lastvisit*1000))
post_data = {
'username': username,
'email': email,
'password': 'RO2z2&l0TJfG$NvZ'
}
result = requests.post(user_url, post_data, headers=headers).json()
if result['code'] == 'ok':
adduser_uid = result['payload']['uid']
update_data = {
"joindate": regdate,
"reputation": dz_credits,
"signature": sightml,
"website": birth_site[3],
"lastonline": lastvisit
}
if avatarstatus != 0:
old_avatar = get_avatar(uid)
new_avatar_address = '/assets/uploads/profile/%a-profileavatar.jpg' % adduser_uid
new_avatar = os.path.join(
basedir, 'profile/%a-profileavatar.jpg' % adduser_uid)
shutil.copy(old_avatar, new_avatar)
update_data["picture"] = new_avatar_address
update_data["uploadedpicture"] = new_avatar_address
if birth_site[0] != 0:
update_data["birthday"] = str(
birth_site[0])+'/'+str(birth_site[1])+'/'+str(birth_site[2])
mongo_result = my_set.update(
{"email": email}, {"$set": update_data})
print(mongo_result)
else:
print(result)
i += 1
def create_user_with(userid):
# SQL 查询所有用户语句
common_memeber_sql = "SELECT uid,username,regdate,lastvisit,credits,email,bday FROM cdb_members where uid=%a" % userid
uc_cursor.execute(common_memeber_sql)
# 获取所有记录列表
results = uc_cursor.fetchall()
for row in results:
uid = row[0]
username = str(row[1])
regdate = row[2]
regdate = int(round(regdate*1000))
lastvisit = row[3]
lastvisit = int(round(lastvisit*1000))
dz_credits = row[4]
site_sightml = get_uc_sightml(uid)
birth_site = row[6]
email = row[5]
site = site_sightml[0]
sightml = dehtml(site_sightml[1])
post_data = {
'username': username,
'email': email,
'password': 'RO2z2&l0TJfG$NvZ'
}
result = requests.post(user_url, post_data, headers=headers).json()
if result['code'] == 'ok':
adduser_uid = result['payload']['uid']
update_data = {
"joindate": regdate,
"reputation": dz_credits,
"signature": sightml,
"website": site,
"lastonline": lastvisit
}
mongo_result = my_set.update(
{"email": email}, {"$set": update_data})
print(result)
return adduser_uid
else:
return result
i += 1
def add_posts(theme, new_theme):
# SQL 查询主题语句
if theme == 740 or theme == 796 or theme == 770:
forum_thread_sql = "SELECT * FROM pre_forum_thread where fid=%a" % theme
else:
forum_thread_sql = "SELECT * FROM pre_forum_thread where fid=%a and views>200" % theme
try:
# 执行SQL语句
dz_cursor.execute(forum_thread_sql)
# 获取所有记录列表
results = dz_cursor.fetchall()
except:
print("Error: unable to fetch data")
# 海报张贴
for row in results:
tid = row[0] # 主题id
fid = row[1] # 板块id
author = row[7]
post_author_id = row[8]
subject = row[9] # 主题标题
dateline = row[10] # 发表时间
lastpost = row[11] # 最后编辑时间
dateline = int(round(dateline*1000))
lastpost = int(round(lastpost*1000))
views = row[13] # 浏览次数
# replies = row[14] # 帖子数
forum_posts = get_forum_post(fid, tid)
author = my_set.find({"username": author}, {"uid": 1, "_id": 0})
try:
author_uid = list(author)[0]['uid']
except:
author_uid = create_user_with(post_author_id)
content = dehtml(forum_posts[-1][2])
topics_post_data = {
'cid': new_theme,
'title': subject,
'content': content
}
try:
topic_post = requests.post(topic_url, topics_post_data, headers=headers)
except (ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout):
time.sleep(0.3)
topic_post = requests.post(topic_url, topics_post_data, headers=headers)
except requests.exceptions.RequestException:
pass
if topic_post.status_code == 413:
continue
topic_post_result = topic_post.json()
print('帖子:'+content+' 已导入~')
if topic_post_result['code'] == 'ok':
tid = topic_post_result['payload']['topicData']['tid']
post_update_data = {
'uid': author_uid,
'timestamp': dateline,
'lastposttime': lastpost,
# 'postcount': replies,
'viewcount': views
}
my_set.update({"tid": tid, "title": subject},
{"$set": post_update_data})
my_set.update({"tid": tid, "content": content}, {
"$set": {'uid': author_uid, 'timestamp': dateline}})
for forum_post in forum_posts[:-1]:
reply_message = dehtml(forum_post[2])
reply_data = {
'content': reply_message
}
try:
requests_post = requests.post(topic_url+'/'+str(tid), reply_data, headers=headers, timeout=500)
except (ConnectionError, requests.exceptions.ConnectTimeout, requests.exceptions.ReadTimeout):
time.sleep(0.3)
requests_post = requests.post(topic_url+'/'+str(tid), reply_data, headers=headers, timeout=500)
except requests.exceptions.RequestException:
continue
if requests_post.status_code == 413:
continue
topic_url_get = requests_post.json()
print('回复:'+reply_message+' 已导入~')
num = 0
for forum_post in forum_posts[:-1]:
reply_message = dehtml(forum_post[2])
reply_author_name = forum_post[0]
reply_dateline = forum_post[1]
reply_dateline = int(round(reply_dateline*1000))
reply_author = my_set.find(
{"username": reply_author_name}, {"uid": 1, "_id": 0})
authorid = forum_post[3]
try:
reply_author_uid = list(reply_author)[0]['uid']
except:
reply_author_uid = create_user_with(authorid)
reply_update_data = {
'timestamp': reply_dateline,
'uid': reply_author_uid
}
test = my_set.update({"tid": str(tid), 'content': reply_message}, {
"$set": reply_update_data})
num += 1
print(num, test)
# for theme in theme_dict:
# add_posts(theme, theme_dict[theme])
add_posts(797,24)
# update_user()
# 关闭数据库连接
mysql_db_dz.close()
@duyuxuan
Copy link

这个能把用户账号和密码以及帖子全部迁移过去么?实际搬迁的网站域名可以发一下么。做个参考,正好有这个想法。

@bay1
Copy link
Author

bay1 commented Jul 10, 2018

@duyuxuan 账号密码效果还不错
不过帖子的迁移,有部分字段没有处理好,比如某个迁移过来的用户的发帖数,和回复数
所以这个代码,仅供参考。不保证最后的结果。如果想用,可以自己试着完善下
测试迁移
部分代码记录

@duyuxuan
Copy link

好的,我参考一下。谢谢了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment