Skip to content

Instantly share code, notes, and snippets.

@cporoske
Last active August 20, 2019 07:25
Show Gist options
  • Save cporoske/a1fad6ca667a8261e1f5de5d00a4d27d to your computer and use it in GitHub Desktop.
Save cporoske/a1fad6ca667a8261e1f5de5d00a4d27d to your computer and use it in GitHub Desktop.
Zhihu Spider.
# -*- coding: utf-8 -*-
"""
Created on Sat Jan 28 11:15:17 2017
@author: SharkU
"""
import copy
from http.cookiejar import LWPCookieJar, LoadError
import urllib.parse
import time
import queue
import threading
import requests
class Spider:
def __init__(self, name, base_url=None, headers={}, data=None,
isthread=False, threadnum=2):
self._name = name # 爬虫名字
self._url = queue.Queue()
self.base_url = base_url
self._url.put(self.base_url)
assert issubclass(type(headers), dict), "headers must be dict-like."
self._headers = {}
self._headers.update(headers)
self._data = data
self._time = 0.2
self._isthread = isthread
self._threadnum = threadnum
# requests
self._session = requests.Session()
self._session.headers = self._headers
self._session.cookies = LWPCookieJar(filename=self._name) # 保存session
try:
self._session.cookies.load()
except (LoadError, OSError):
print("Spider %s cookie is not exist" % self._name)
def save(self):
self._session.cookies.save(self._name)
@property
def headers(self):
return copy.deepcopy(self._headers)
@property
def data(self):
return copy.deepcopy(self._data)
@data.setter
def set_data(self, data):
self._data = data
@property
def time(self):
return self._time
@time.setter
def time(self, time):
self._time = time
def start(self):
# 开始爬虫
# 单线程版本
if self._isthread:
self.multithread()
else:
self.singlethread()
def singlethread(self):
# 单线程
while self._url.qsize():
url = self._url.get()
response = self._get(url) # 获得响应
self.analyze_rules(response)
result = self.parser(response) # 解析响应
self.store(result) # 存储结果
time.sleep(self._time)
def _worker(self):
while True:
url = self._url.get()
response = self._get(url)
self.analyze_rules(response) # 定义跟进规则
result = self.parser(response)
self.store(result)
self._url.task_done()
time.sleep(self._time)
def multithread(self):
for i in range(0, self._threadnum):
t = threading.Thread(target=self._worker)
t.setDaemon(True)
t.start()
self._url.join() # 阻塞直到任务结束
def parser(self, response):
# override
pass
def analyze_rules(self, response):
pass
def store(self, result):
# override
pass
def _get(self, url):
return self._session.get(url)
def _post(self, url, data=None):
data = data or self._data
return self._session.post(url)
def request(self, method, url, headers={}, data=None):
method = method.lower()
{"get": self._get, "post": self._post}[method](url)
if __name__ == "__main__":
pass
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 27 01:30:36 2017
@author: SharkU
"""
import re
import datetime
import time
import os.path
import lxml.etree
import lxml.html
try:
from PIL import Image
except ImportError:
pass
import Spider
class ZhihuSpider(Spider.Spider):
def __init__(self, name, base_url, headers={}, data=None, isthread=False, threadnum=2):
super().__init__(name, base_url, headers, data, isthread, threadnum)
self._urlset = set()
self._urlset.add(base_url)
def get_xsrf(self):
url = 'https://www.zhihu.com'
page = self._session.get(url)
assert page.encoding.lower() == "utf-8", page.encoding
content = page.text
pattern = r'name="_xsrf" value="(.*?)"'
_xsrf = re.findall(pattern, content)
return _xsrf
# 获取验证码
def get_captcha(self):
t = str(int(time.time()*1000))
captcha_url = 'https://www.zhihu.com/captcha.gif?r=' + t + "&type=login"
r = self._session.get(captcha_url)
with open('captcha.jpg', 'wb') as f:
f.write(r.content)
# 用pillow 的 Image 显示验证码
# 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
print(u'请到 %s 目录找到captcha.jpg 手动输入' % os.path.abspath('captcha.jpg'))
captcha = input("please input the captcha:")
return captcha
def isLogin(self):
# 通过查看用户个人信息来判断是否已经登录
url = "https://www.zhihu.com/settings/profile"
login_code = self._session.get(url, allow_redirects=False, headers=self._headers).status_code
if int(x=login_code) == 200:
return True
else:
return False
def login(self, secret, account):
# 通过输入的用户名判断是否是手机号
if re.match(r"^1\d{10}$", account):
print("手机号登录")
post_url = 'https://www.zhihu.com/login/phone_num'
postdata = {
'_xsrf': self.get_xsrf(),
'password': secret,
'remember_me': 'true',
'phone_num': account,
}
else:
print("邮箱登录 ")
post_url = 'https://www.zhihu.com/login/email'
postdata = {
'_xsrf': self.get_xsrf(),
'password': secret,
'remember_me': 'y',
'email': account,
}
try:
# 不需要验证码直接登录成功
login_page = self._session.post(post_url, data=postdata)
login_code = login_page.text
print(login_page.status)
print(login_code)
except:
# 需要输入验证码后才能登录成功
postdata["captcha"] = self.get_captcha() # 获取验证码
login_page = self._session.post(post_url, data=postdata)
print(login_page.status_code, login_page.text) # 登录成功应该返回json
login_code = eval(login_page.text)
print(login_code['msg'])
self._session.cookies.save()
def analyze_rules(self, response):
html = lxml.etree.HTML(response.text)
BASE_URL = "https://www.zhihu.com/question/29364545"
url = html.xpath('//*[@id="zh-single-question-page"]/div[1]/div/div[7]/div/span[last()]/a')
if url: # 如果还有下一页
url = BASE_URL+url[0].attrib['href'] # 提取链接
if url not in self._urlset:
self._urlset.add(url)
self._url.put(url)
def parser(self, response):
result = []
pattern = r"《(.*?)》"
html = lxml.html.fromstring(response.text)
content_list = html.xpath('//*[@id="zh-question-answer-wrap"]/div')
for item in content_list:
createdate = datetime.date.fromtimestamp(int(item.attrib['data-created']))
author = item.cssselect("a[class='author-link']")
author = author[0].text if author else "匿名用户"
vote = item.cssselect("span[class='js-voteCount']")
vote = vote[0].text if vote else '0'
content = item.cssselect("div[class='zm-editable-content clearfix']")
content = content[0].text_content()
song_list = re.findall(pattern, content)
result.append((author, str(createdate), vote, song_list))
return result
def store(self, result):
with open("", 'a') as f:
for i in result:
f.write(i[0]+'\t\t'+i[1]+'\t\t'+i[2]+'\t\t'+','.join(i[3])+'\n')
zhihu_headers = {
"Host": "www.zhihu.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64; rv:51.0) Gecko/20100101 Firefox/51.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Cache-Control": "max-age=0"
}
if __name__ == "__main__":
name = "cookie"
base = "https://www.zhihu.com/question/29364545?sort=created&page=1"
test = ZhihuSpider(name, base, headers=zhihu_headers, isthread=True, threadnum=10)
if test.isLogin():
print("已经登录")
test.time = 0.4
start = time.time()
test.start()
print("一共用时: ", time.time()-start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment