Skip to content

Instantly share code, notes, and snippets.

@andy0130tw
Created July 31, 2015 19:22
Show Gist options
  • Save andy0130tw/317cce807f5887504ece to your computer and use it in GitHub Desktop.
Save andy0130tw/317cce807f5887504ece to your computer and use it in GitHub Desktop.
I love this blog. If I have no internet connection I would read this instead of playing mine sweeper. bs4, peewee and requests are used.
from bs4 import BeautifulSoup
import requests
import re
import models as m
root_url = 'http://www.matrix67.com/'
blog_url = root_url + 'blog/'
image_url = root_url + 'blogimage/'
page_template = blog_url + 'page/{}'
post_template = blog_url + 'archives/{}'
s = requests.Session()
rexPostId = re.compile(r'post-(\d+)')
rexCommentId = re.compile(r'comment-(\d+)')
rexTagId = re.compile(r'tag-(\S+)')
rexGavatarHash = re.compile(r'\/avatar\/(.{32})')
def bs(text):
return BeautifulSoup(text, 'html.parser')
def processTags(postId, tagIdList, tagNameList):
tagDict = [ {
'post': postId,
'tag': tagName,
'original_id': tagIdList[i]
} for (i, tagName) in enumerate(tagNameList) ]
m.Tag.insert_many(tagDict).execute()
def processImage(postId, imageTag):
# todo: download image
src = imageTag['src']
req = s.get(src, stream=True)
if req.status_code == 200:
content = req.raw.data
else:
content = None
return {
'original_url': src,
'post': postId,
'content': content
};
def processComment(postId, commentTag):
#todo: n-level comment
id = rexCommentId.search(commentTag['id']).group(1)
cite = commentTag.select('cite.fn')[0]
content = commentTag.find('p')
contentText = content.renderContents()
responseText = None
contentNext = commentTag.select('p > span')
if contentNext:
responseText = contentNext[0].renderContents()
web = None
name_link = cite.find('a')
if name_link:
web = name_link['href']
name = name_link.string
else:
name = cite.string
try:
# due to incorrect markup, this might fail mystriously
meta = commentTag.select('div.comment-meta')[0]
created_tmp = re.sub(r'(\D)(\d)(?!\d)', r'\g<1>0\2', meta.find('a').string)
created = re.sub(r'(....)年(..)月(..)日 (..:..)', r'\1-\2-\3T\4:00Z', created_tmp)
except:
created = None
# f = open('www.txt', 'wb')
# f.write(bytes(commentTag.prettify(), 'utf8'))
# f.close()
avatarMatch = rexGavatarHash.match(commentTag.select('img')[0]['src'])
if avatarMatch:
avatarHash = avatarMatch.group(1)
else:
avatarHash = None
return {
'id': id,
'post': postId,
'name': name,
'avatar': avatarHash,
'content': contentText,
'author_response': responseText,
'web': web,
'created': created
}
def convertContent(content):
# todo: parse all subnodes and turn into markdown format.
return content.renderContents()
def crawlEntry(postId):
resp = s.get(post_template.format(postId))
doc = bs(resp.text)
post = doc.select('#main article')[0]
classes = post['class']
tag_id_list = []
for cls in classes:
tag_id_match = rexTagId.search(cls)
if tag_id_match:
tag_id_list.append(tag_id_match.group(1))
content = post.select('div.entry-content')[0]
titleText = post.select('header.entry-header h1.entry-title')[0].string
meta = doc.select('div.entry-meta')[0]
created = meta.select('time.entry-date')[0]['datetime'].replace('+00:00', 'Z')
images = post.select('img')
imagePreview = None
with m.db.atomic():
for img in images:
if img['src'].find(image_url) == 0:
imgData = processImage(postId, img)
imgModel, _ = m.Image.create_or_get(**imgData)
# todo: replace the src of image wisely
if imagePreview is None:
imagePreview = imgModel
print(' IMAGE {:>2}'.format(len(images)), end='')
tags = meta.select('a[rel="tag"]')
processTags(postId, tag_id_list, [ tag.string for tag in tags ])
print(' TAG {:>2}'.format(len(tags)), end='')
comments = doc.select('#comments ul.comment-list > li')
for comment in comments:
commentData = processComment(postId, comment)
m.Comment.create_or_get(**commentData)
print(' COMMENT {:>3}'.format(len(comments)), end='')
print()
# in case that
contentText = convertContent(content)
return {
'id': postId,
'title': titleText,
'content': contentText,
'created': created,
'image': imagePreview,
'preview': None
}
def main():
print('Initiating database...')
m.init()
page = 173
# with m.db.atomic():
while page > 0:
print('PAGE #{:>3}...'.format(page))
resp = s.get(page_template.format(page))
# todo: handling 404
page_doc = bs(resp.text)
# some mysterious markup mess
# todo: replace wrongly balanced tags: </p></blockquote> -> </blockquote></p>
entries = page_doc.select('article.post.status-publish')
entries.reverse()
post_list = []
for entry in entries:
post_id = rexPostId.search(entry['id']).group(1)
print(' POST #{:>5}... '.format(post_id), end='')
# prevent duplicating
if m.Post.select().where(m.Post.id == post_id).first():
print(' EXISTS')
continue;
entryDict = crawlEntry(post_id)
post_list.append(entryDict)
# or it will raise sqlite3 syntax error
if post_list:
m.Post.insert_many(post_list).execute()
# next iteration!
page -= 1
if __name__ == '__main__':
main()
import datetime
from peewee import *
db = SqliteDatabase('matrix67.sqlite')
class ModelBase(Model):
class Meta:
database = db
ImageProxy = Proxy()
class Post(ModelBase):
id = IntegerField(primary_key=True)
title = TextField()
content = TextField()
created = DateTimeField()
image = ForeignKeyField(ImageProxy, null=True)
preview = TextField(null=True)
last_seek = DateTimeField(default=datetime.datetime.now)
class Comment(ModelBase):
id = IntegerField(primary_key=True)
post = ForeignKeyField(Post)
name = TextField()
avatar = TextField(null=True)
content = TextField(null=True)
author_response = TextField(null=True)
web = TextField(null=True)
created = DateTimeField(null=True)
class Image(ModelBase):
id = PrimaryKeyField()
original_url = TextField()
post = ForeignKeyField(Post)
content = BlobField(null=True)
ImageProxy.initialize(Image)
class Tag(ModelBase):
id = PrimaryKeyField()
post = ForeignKeyField(Post)
tag = CharField()
original_id = CharField(null=True)
def init():
with db.atomic():
db.create_tables([
Post,
Comment,
Image,
Tag
], safe=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment