Skip to content

Instantly share code, notes, and snippets.

@luw2007
Created February 27, 2013 07:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save luw2007/5045929 to your computer and use it in GitHub Desktop.
Save luw2007/5045929 to your computer and use it in GitHub Desktop.
用来抓取 ikandou.com 网站中的书籍信息的脚本 需要安装 mongodb 数据库. - pymongo 用于存储数据库 - requests 方便的读取页面 - pyquery 使用jquery 的语法来控制网页和xml, 其基于lxml - lxml
#!/usr/bin/env python2
#-*- coding:utf-8 -*-
"""
用来抓取 ikandou.com 网站信息的脚本
需要安装 mongodb 数据库.
- pymongo 用于存储数据库
- requests 方便的读取页面
- pyquery 使用jquery 的语法来控制网页和xml, 其基于lxml
- lxml
"""
from time import sleep
import requests
from pyquery import PyQuery as pq
from pymongo import Connection
HOST = 'localhost'
DB_NAME = 'ikandou'
TABLE_MISSION = 'mission'
MISSION_KEY = 'next_mission'
TABLE_BOOKS = 'books'
URL_FORMAT = 'http://ikandou.com/book/%i'
TIMEOUT = 1
USERNAME = None
PASSWORD = None
class Info(dict):
def __init__(self, **kwargs):
super(Info, self).__init__(**kwargs)
self.update({
u'介绍': {
'key': 'div.mbook-detail',
u'图片': ['img', lambda x: x.attr('src')],
u'书名': 'div.title h2',
u'作者': 'div.title span.author',
u'书籍信息': 'div.title span.subtitle',
u'简介': 'div.summary',
},
u'ikandou': {
u'喜欢': ['strong.like-number', self.__get_num],
u'收藏': ['strong.fork-number', self.__get_num],
u'下载': ['span.meta-act-link.view-toggle', self.__get_num],
u'标签': 'ol#tags.tags.group',
},
u'版本': {
# key 用来找到 对应代码块
'key': ['li.response.comment.group.owner', self.__find_ebook_ids],
# total 表示内容是列表, 需要遍历key中的每一个元素
'total': {
u'投票数': ['span.votesnum', self.__get_num],
u'上传者': 'div.desc-body a.url',
u'上传时间': ['div.desc-body span.date', lambda x: x.text().split(' ', 1)[0][3:]],
u'大小': ['div.desc-body span.date', lambda x: x.text().split(' ', 1)[1][2:]],
u'类型': ['ul.ebook-tab li.download a', lambda x: x.text()[3:]],
u'下载地址': ['ul.ebook-tab li.download a', lambda x: x.attr('href')],
u'下载次数': ['ul.ebook-tab li.download span', self.__get_num],
u'推送次数': ['ul.ebook-tab li.pushed span', self.__get_num],
u'推送次数_': ['ul.ebook-tab li.pushto span', self.__get_num],
u'评论': ['ol.comments li', self.__get_comment],
},
},
})
def __get_comment(self, s):
comments = []
s('li.comment-submission.last.top').remove()
for x, j in zip(s('li h2'), s('li p')):
comments.append((x.text_content().split() + [j.text_content()]))
return comments
def __get_num(self, x):
if x is None or x.text() is None:
return 0
try:
r = int(filter(lambda i: 47 < ord(i) < 58, x.text()))
return r
except Exception as e:
print('%s cat not change num, error %s' % (x.text(), e.message))
return 0
def __find_ebook_ids(self, key, _pq):
return ['li#' + i.attrib['id'] for i in _pq(key) if 'id' in i.attrib]
class Ikandou(object):
def __init__(self, username=None, password=None, host=None, db=None):
self.db = DB(host, db)
self.username = username
self.password = password
self.info = Info()
def _check_cookies(self):
if hasattr(self, 'cookies'):
return
if self.username is None or self.password is None:
self._cookies = {}
return
login_url = 'http://ikandou.com/accounts/login/'
page = requests.get(login_url)
hide_key = pq(page.text)('div.page-content input:first').attr('value')
formdata = {"username": self.username, "password": self.password, 'remember_me': 'on',
'csrfmiddlewaretoken': hide_key}
_post = requests.post(login_url, data=formdata, cookies=page.cookies, allow_redirects=True)
if _post.status_code == 200:
self._cookies = page.cookies
def _get_html(self):
self._check_cookies()
url = URL_FORMAT % self.db.count
page = requests.get(url, cookies=self._cookies)
if page.status_code != 200 or page.text.strip() == '':
self.db.count += 1
return None
return page.text
def get_info(self):
def _(_row, _pq):
_out = {}
for m, index in _row.items():
if isinstance(index, (list, tuple)):
index, func = index
_out[m] = func(_pq(index))
else:
_out[m] = _pq(index).text()
return _out
page = self._get_html()
if page is None:
return
out = {}
d = pq(page)
for k, row in self.info.items():
if 'total' in row:
(_key, func), _total = row['key'], row['total']
for key in func(_key, d):
out.setdefault(k, []).append(_(_total, d(key)))
else:
out[k] = _(row, d(row.pop('key')) if row.has_key('key') else d)
self.db.insert(out)
def start(self):
print('start: %i' % self.db.count)
timeout = TIMEOUT
while True:
try:
self.get_info()
timeout = TIMEOUT
print('next mission: %i' % self.db.count)
except Exception as e:
timeout = 2 ** timeout - 1
print('error%s, and after %s s, will try again.' % (e.message, timeout))
sleep(timeout)
class DB(object):
"""
>>> b = DB(db='test')
>>> del b.count
>>> b.insert({'a':1})
>>> b.insert({'b':1})
>>> b.insert({'c':1})
>>> b.count
4
>>> del b
>>> c = DB(db='test')
>>> c.insert({'d':1})
>>> c.count
5
"""
def __init__(self, host=None, db=None):
self.db = Connection(host)[db]
@property
def count(self):
if not hasattr(self, '_count'):
i = self.db[TABLE_MISSION].find_one({MISSION_KEY: {"$exists": True}}) or {}
self._count = i.get(MISSION_KEY, 1)
return self._count
@count.setter
def count(self, count):
self.db[TABLE_MISSION].update({MISSION_KEY: {"$exists": True}}, {"$set": {MISSION_KEY: count}}, upsert=True)
self._count = count
@count.deleter
def count(self):
self.db[TABLE_MISSION].remove({MISSION_KEY: {"$exists": True}})
if hasattr(self, '_count'):
del self._count
def insert(self, value):
value['_id'] = self.count
self.db[TABLE_BOOKS].insert(value, manipulate=False)
self.count += 1
def main():
try:
from ikandou_setting import USERNAME, PASSWORD
except ImportError:
pass
Ikandou(USERNAME, PASSWORD, HOST, DB_NAME).start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment