Skip to content

Instantly share code, notes, and snippets.

@cuihaoleo
Created February 23, 2019 12:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cuihaoleo/ccb5c61cb1267257a5e3e3732503b2f3 to your computer and use it in GitHub Desktop.
Save cuihaoleo/ccb5c61cb1267257a5e3e3732503b2f3 to your computer and use it in GitHub Desktop.
Export is-programmer.com data to JSON
#!/usr/bin/env python3
import os
import datetime
import logging
import psycopg2
from requestsutils import RequestsBase
from htmlutils import parse_document_from_requests
from pgutils import savepoint
from getpass import getpass
logger = logging.getLogger(__name__)
class Chito(RequestsBase):
# TODO: change this
baseurl = 'http://cuihao.is-programmer.com/'
auto_referer = True
def may_login(self):
r = self.request('/admin', allow_redirects = False)
if r.status_code == 302:
self.login()
def login(self):
p = getpass('登录密码: ')
r = self.request('/login')
doc = parse_document_from_requests(r)
token = doc.xpath('//input[@name="authenticity_token"]')[0].get('value')
form = {
'authenticity_token': token,
'commit': '登录',
'name': os.environ['USER'],
'password': p,
'persist': '1',
'utf8': '✓',
}
r = self.request('/login', method='POST',
data = form,
allow_redirects = False)
if not r.headers['Location'].endswith('/admin'):
raise Exception('failed to login: %r', r)
def get_comments(self, page=1):
r = self.request('/admin/comments', params = {'page': str(page)})
doc = parse_document_from_requests(r)
rows = doc.xpath('//table[@id="comments_table"]//tr')[1:]
for tr in rows:
d = {}
d['id'] = int(tr[0][0].get('value'))
d['name'] = tr[1].text
# without timezone; PostgreSQL will give it a default
d['time'] = datetime.datetime.strptime(tr[2].text, '%Y-%m-%d %H:%M')
try:
d['content'] = tr[3].text.replace('\r', '')
except AttributeError:
d['content'] = ''
d['ip'] = tr[4].text
if len(tr[5]) == 0:
d['email'] = None
else:
d['email'] = tr[5][0].get('href').split(':', 1)[-1]
d['post_id'] = int(tr[6][0].get('href').split('/')[-1].split('.')[0])
yield d
def get_messages(self, page=1):
r = self.request('/admin/messages', params = {'page': str(page)})
doc = parse_document_from_requests(r)
rows = doc.xpath('//table[@id="messages_table"]//tr')[1:]
for tr in rows:
d = {}
d['id'] = int(tr[0][0].get('value'))
d['name'] = tr[1].text
# without timezone; PostgreSQL will give it a default
d['time'] = datetime.datetime.strptime(tr[2].text, '%Y-%m-%d %H:%M')
try:
d['content'] = tr[3].text.replace('\r', '')
except AttributeError:
# empty messages
d['content'] = ''
d['ip'] = tr[4].text
if len(tr[5]) == 0:
d['email'] = None
else:
d['email'] = tr[5][0].get('href').split(':', 1)[-1]
d['post_id'] = None
yield d
def get_post(self, pid):
d = {'id': pid}
r = self.request('/admin/posts/%d/edit' % d['id'])
article = parse_document_from_requests(r)
main = article.xpath('//div[@id="form_main"]')[0]
d['title'] = main.xpath('//input[@id="article_title"]')[0].get('value')
d['category'] = main.xpath(
'//div[@id="category_list_remote"]//input[@checked]')[0].tail.strip()
d['content'] = main.xpath('//textarea')[0] \
.text_content().replace('\r', '')
d['tags'] = [x.strip() for x in
main.xpath('//input[@id="article_tag_list"]')[0]
.get('value').split(',') if x.strip()]
d['linktext'] = main.xpath('//input[@id="article_permalink"]')[0] \
.get('value')
d['state'] = 'post'
return d
def get_posts(self, page=1):
r = self.request('/admin/posts', params = {'page': str(page)})
doc = parse_document_from_requests(r)
rows = doc.xpath('//table[@id="article_table"]//tr')[1:]
for tr in rows:
d = {}
d['id'] = int(tr[0][0].get('value'))
d['title'] = tr[1][0].text
# without timezone; PostgreSQL will give it a default
d['time'] = datetime.datetime.strptime(tr[2].text, '%Y-%m-%d %H:%M')
d['category'] = tr[4].text
r = self.request('/admin/posts/%d/edit' % d['id'])
article = parse_document_from_requests(r)
main = article.xpath('//div[@id="form_main"]')[0]
d['content'] = main.xpath('//textarea')[0] \
.text_content().replace('\r', '')
d['tags'] = [x.strip() for x in
main.xpath('//input[@id="article_tag_list"]')[0]
.get('value').split(',') if x.strip()]
d['linktext'] = main.xpath('//input[@id="article_permalink"]')[0] \
.get('value')
d['state'] = 'post'
yield d
def get_files(self, type):
raise NotImplementedError
def do_one_type(conn, method, sql):
with conn:
cursor = conn.cursor()
page = 1
done = False
while not done:
data = method(page=page)
has_data = False
for entry in data:
has_data = True
try:
with savepoint(cursor, 'inserting'):
cursor.execute(sql, entry)
except psycopg2.IntegrityError as e:
if e.pgcode == '23505': # unique_violation
done = True
break
elif e.pgcode == '23503': # foreign_key_violation
# skip stale comments
pass
if not has_data:
done = True
page += 1
def do_work(conn, chito):
for state in ['posts']:
do_one_type(conn, chito.get_posts, '''
insert into posts
(id, title, time, category, content, tags, linktext, state)
values
(%(id)s, %(title)s, %(time)s, %(category)s,
%(content)s, %(tags)s, %(linktext)s, %(state)s)
''')
do_one_type(conn, chito.get_messages, '''
insert into comments
(id, name, time, content, ip, email, post_id)
values
(%(id)s, %(name)s, %(time)s, %(content)s,
%(ip)s, %(email)s, %(post_id)s)
''')
do_one_type(conn, chito.get_comments, '''
insert into comments
(id, name, time, content, ip, email, post_id)
values
(%(id)s, %(name)s, %(time)s, %(content)s,
%(ip)s, %(email)s, %(post_id)s)
''')
def update_posts(conn, chito, ids):
posts = [chito.get_post(x) for x in ids]
if not posts:
return
with conn:
cursor = conn.cursor()
for p in posts:
cursor.execute('''\
update posts
set title = %(title)s,
category = %(category)s,
content = %(content)s,
tags = %(tags)s,
linktext = %(linktext)s
where id = %(id)s
''', p)
if __name__ == '__main__':
from nicelogger import enable_pretty_logging
enable_pretty_logging('DEBUG')
conn = psycopg2.connect('')
chito = Chito(cookiefile='.cookie')
chito.may_login()
do_work(conn, chito)
update_posts(conn, chito, [])
del chito
from chito import Chito
import json
from bson import json_util
import itertools
def main():
blog = Chito(cookiefile=".cookie")
blog.may_login()
dump = {
"posts": [],
"comments": [],
}
for page in itertools.count(1):
has_data = False
for post in blog.get_posts(page):
has_data = True
detail = blog.get_post(post["id"])
post.update(detail)
dump["posts"].append(post)
if not has_data:
break
for method in (blog.get_messages, blog.get_comments):
for page in itertools.count(1):
has_data = False
for msg in method(page):
has_data = True
dump["comments"].append(msg)
if not has_data:
break
with open("output.json", "w") as fout:
json.dump(dump, fout, default=str)
if __name__ == '__main__':
main()
#!/usr/bin/env python3
from xml.sax.saxutils import escape
import json
from html.parser import HTMLParser
import base64
from bs4 import BeautifulSoup
import urllib.request
import os
import datetime
import pytz
import sys
import hashlib
VIMWIKI_TEMPLATE="blog-isp"
HTML_IMG_BASE="/img/isp/"
BLOG_URL = "http://cuihao.is-programmer.com"
def main():
with open("output.json") as fin:
dump = json.load(fin)
print("""<?xml version="1.0" encoding="UTF-8"?>
<rss version="2.0"
xmlns:content="http://purl.org/rss/1.0/modules/content/"
xmlns:dsq="http://www.disqus.com/"
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:wp="http://wordpress.org/export/1.0/">
<channel>""")
posts = {}
for post in dump["posts"]:
posts[post["id"]] = post
for comm in dump["comments"]:
if comm["post_id"] is None:
comm["post_id"] = 214156
if comm["email"] is not None:
avatar="https://www.gravatar.com/avatar/" + hashlib.md5(comm["email"].lower().encode()).hexdigest()
else:
avatar=""
post = posts[comm["post_id"]]
mt = datetime.datetime.strptime(comm["time"], "%Y-%m-%d %H:%M:%S")
utctime = mt.astimezone(pytz.utc).strftime("%Y-%m-%d %H:%M:%S")
print("""<item>
<title>{post_title}</title>
<link>{url}</link>
<dsq:thread_identifier>{page_id}</dsq:thread_identifier>
<wp:comment_status>open</wp:comment_status>
<wp:comment>
<dsq:remote>
<dsq:avatar>{avatar}</dsq:avatar>
</dsq:remote>
<wp:comment_id>{cid}</wp:comment_id>
<wp:comment_author>{username}</wp:comment_author>
<wp:comment_author_email>{email}</wp:comment_author_email>
<wp:comment_content><![CDATA[{content}]]></wp:comment_content>
<wp:comment_author_IP>{ip}</wp:comment_author_IP>
<wp:comment_date_gmt>{time}</wp:comment_date_gmt>
<wp:comment_approved>1</wp:comment_approved>
</wp:comment>
</item>""".format(
cid=comm["id"],
post_title=escape(post["title"]),
url="https://blog.i-yu.me/html/zh/isp/isp_%d.html" % post["id"],
page_id="zh/isp/isp_%d.html" % post["id"],
time=utctime,
avatar=avatar,
ip=comm["ip"],
email=comm["email"],
username=escape(comm["name"]),
content=comm["content"]))
print("""
</channel>
</rss>""")
if __name__ == "__main__":
main()
#!/usr/bin/env python3
import json
from html.parser import HTMLParser
import base64
from bs4 import BeautifulSoup
import urllib.request
import urllib.parse
import os
VIMWIKI_TEMPLATE="blog-isp"
HTML_IMG_BASE="/img/isp/"
BLOG_URL = "http://cuihao.is-programmer.com"
def main():
with open("output.json") as fin:
dump = json.load(fin)
os.makedirs("dump/wiki", exist_ok=True)
os.makedirs("dump/img", exist_ok=True)
for post in dump["posts"]:
fname = "isp_%d" % post["id"]
with open(os.path.join("dump/wiki", fname + ".wiki"), "w") as fout:
buf = list(post["tags"])
buf.insert(0, post["category"])
print(":" + ":".join(buf) + ":", file=fout)
print("%date", post["time"].split()[0], file=fout)
print("%title", post["title"], file=fout)
# print("%template", VIMWIKI_TEMPLATE, file=fout)
soup = BeautifulSoup(post["content"], features="html.parser")
for img in soup.findAll('img'):
if img['src'].startswith("/"):
bname = img["src"].split("/")[-1]
url = BLOG_URL + urllib.parse.quote(img["src"])
#urllib.request.urlretrieve(url, "dump/img/" + bname)
img['src'] = os.path.join(HTML_IMG_BASE, bname)
print("{{{", file=fout)
print(soup.get_text(), file=fout)
print("}}}", file=fout)
print('[[local:%s.html|__RAW_HTML__]]' % fname, file=fout)
with open(os.path.join("dump/wiki", fname + ".html"), "w") as fout:
fout.write(str(soup))
print("- [[%s|%s]]" % (fname, post["title"]))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment