Created
July 31, 2015 19:22
-
-
Save andy0130tw/317cce807f5887504ece to your computer and use it in GitHub Desktop.
I love this blog. If I have no internet connection I would read this instead of playing mine sweeper. bs4, peewee and requests are used.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bs4 import BeautifulSoup | |
import requests | |
import re | |
import models as m | |
root_url = 'http://www.matrix67.com/' | |
blog_url = root_url + 'blog/' | |
image_url = root_url + 'blogimage/' | |
page_template = blog_url + 'page/{}' | |
post_template = blog_url + 'archives/{}' | |
s = requests.Session() | |
rexPostId = re.compile(r'post-(\d+)') | |
rexCommentId = re.compile(r'comment-(\d+)') | |
rexTagId = re.compile(r'tag-(\S+)') | |
rexGavatarHash = re.compile(r'\/avatar\/(.{32})') | |
def bs(text): | |
return BeautifulSoup(text, 'html.parser') | |
def processTags(postId, tagIdList, tagNameList): | |
tagDict = [ { | |
'post': postId, | |
'tag': tagName, | |
'original_id': tagIdList[i] | |
} for (i, tagName) in enumerate(tagNameList) ] | |
m.Tag.insert_many(tagDict).execute() | |
def processImage(postId, imageTag): | |
# todo: download image | |
src = imageTag['src'] | |
req = s.get(src, stream=True) | |
if req.status_code == 200: | |
content = req.raw.data | |
else: | |
content = None | |
return { | |
'original_url': src, | |
'post': postId, | |
'content': content | |
}; | |
def processComment(postId, commentTag): | |
#todo: n-level comment | |
id = rexCommentId.search(commentTag['id']).group(1) | |
cite = commentTag.select('cite.fn')[0] | |
content = commentTag.find('p') | |
contentText = content.renderContents() | |
responseText = None | |
contentNext = commentTag.select('p > span') | |
if contentNext: | |
responseText = contentNext[0].renderContents() | |
web = None | |
name_link = cite.find('a') | |
if name_link: | |
web = name_link['href'] | |
name = name_link.string | |
else: | |
name = cite.string | |
try: | |
# due to incorrect markup, this might fail mystriously | |
meta = commentTag.select('div.comment-meta')[0] | |
created_tmp = re.sub(r'(\D)(\d)(?!\d)', r'\g<1>0\2', meta.find('a').string) | |
created = re.sub(r'(....)年(..)月(..)日 (..:..)', r'\1-\2-\3T\4:00Z', created_tmp) | |
except: | |
created = None | |
# f = open('www.txt', 'wb') | |
# f.write(bytes(commentTag.prettify(), 'utf8')) | |
# f.close() | |
avatarMatch = rexGavatarHash.match(commentTag.select('img')[0]['src']) | |
if avatarMatch: | |
avatarHash = avatarMatch.group(1) | |
else: | |
avatarHash = None | |
return { | |
'id': id, | |
'post': postId, | |
'name': name, | |
'avatar': avatarHash, | |
'content': contentText, | |
'author_response': responseText, | |
'web': web, | |
'created': created | |
} | |
def convertContent(content): | |
# todo: parse all subnodes and turn into markdown format. | |
return content.renderContents() | |
def crawlEntry(postId): | |
resp = s.get(post_template.format(postId)) | |
doc = bs(resp.text) | |
post = doc.select('#main article')[0] | |
classes = post['class'] | |
tag_id_list = [] | |
for cls in classes: | |
tag_id_match = rexTagId.search(cls) | |
if tag_id_match: | |
tag_id_list.append(tag_id_match.group(1)) | |
content = post.select('div.entry-content')[0] | |
titleText = post.select('header.entry-header h1.entry-title')[0].string | |
meta = doc.select('div.entry-meta')[0] | |
created = meta.select('time.entry-date')[0]['datetime'].replace('+00:00', 'Z') | |
images = post.select('img') | |
imagePreview = None | |
with m.db.atomic(): | |
for img in images: | |
if img['src'].find(image_url) == 0: | |
imgData = processImage(postId, img) | |
imgModel, _ = m.Image.create_or_get(**imgData) | |
# todo: replace the src of image wisely | |
if imagePreview is None: | |
imagePreview = imgModel | |
print(' IMAGE {:>2}'.format(len(images)), end='') | |
tags = meta.select('a[rel="tag"]') | |
processTags(postId, tag_id_list, [ tag.string for tag in tags ]) | |
print(' TAG {:>2}'.format(len(tags)), end='') | |
comments = doc.select('#comments ul.comment-list > li') | |
for comment in comments: | |
commentData = processComment(postId, comment) | |
m.Comment.create_or_get(**commentData) | |
print(' COMMENT {:>3}'.format(len(comments)), end='') | |
print() | |
# in case that | |
contentText = convertContent(content) | |
return { | |
'id': postId, | |
'title': titleText, | |
'content': contentText, | |
'created': created, | |
'image': imagePreview, | |
'preview': None | |
} | |
def main(): | |
print('Initiating database...') | |
m.init() | |
page = 173 | |
# with m.db.atomic(): | |
while page > 0: | |
print('PAGE #{:>3}...'.format(page)) | |
resp = s.get(page_template.format(page)) | |
# todo: handling 404 | |
page_doc = bs(resp.text) | |
# some mysterious markup mess | |
# todo: replace wrongly balanced tags: </p></blockquote> -> </blockquote></p> | |
entries = page_doc.select('article.post.status-publish') | |
entries.reverse() | |
post_list = [] | |
for entry in entries: | |
post_id = rexPostId.search(entry['id']).group(1) | |
print(' POST #{:>5}... '.format(post_id), end='') | |
# prevent duplicating | |
if m.Post.select().where(m.Post.id == post_id).first(): | |
print(' EXISTS') | |
continue; | |
entryDict = crawlEntry(post_id) | |
post_list.append(entryDict) | |
# or it will raise sqlite3 syntax error | |
if post_list: | |
m.Post.insert_many(post_list).execute() | |
# next iteration! | |
page -= 1 | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from peewee import * | |
db = SqliteDatabase('matrix67.sqlite') | |
class ModelBase(Model): | |
class Meta: | |
database = db | |
ImageProxy = Proxy() | |
class Post(ModelBase): | |
id = IntegerField(primary_key=True) | |
title = TextField() | |
content = TextField() | |
created = DateTimeField() | |
image = ForeignKeyField(ImageProxy, null=True) | |
preview = TextField(null=True) | |
last_seek = DateTimeField(default=datetime.datetime.now) | |
class Comment(ModelBase): | |
id = IntegerField(primary_key=True) | |
post = ForeignKeyField(Post) | |
name = TextField() | |
avatar = TextField(null=True) | |
content = TextField(null=True) | |
author_response = TextField(null=True) | |
web = TextField(null=True) | |
created = DateTimeField(null=True) | |
class Image(ModelBase): | |
id = PrimaryKeyField() | |
original_url = TextField() | |
post = ForeignKeyField(Post) | |
content = BlobField(null=True) | |
ImageProxy.initialize(Image) | |
class Tag(ModelBase): | |
id = PrimaryKeyField() | |
post = ForeignKeyField(Post) | |
tag = CharField() | |
original_id = CharField(null=True) | |
def init(): | |
with db.atomic(): | |
db.create_tables([ | |
Post, | |
Comment, | |
Image, | |
Tag | |
], safe=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment