-
-
Save kenwoodjw/b326a7f784106ed0d13e80d307d7a5d0 to your computer and use it in GitHub Desktop.
知乎API爬虫
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf-8 | |
import os | |
import json | |
import time | |
import requests | |
from requests.auth import AuthBase | |
from settings import LOGIN_URL, CAPTCHA_URL | |
from config import ( | |
API_VERSION, APP_VERSION, APP_BUILD, UUID, UA, APP_ZA, CLIENT_ID, | |
TOKEN_FILE) | |
from utils import gen_login_signature | |
from exception import LoginException | |
LOGIN_DATA = { | |
'grant_type': 'password', | |
'source': 'com.zhihu.ios', | |
'client_id': CLIENT_ID | |
} | |
class ZhihuOAuth(AuthBase): | |
def __init__(self, token=None): | |
self._token = token | |
def __call__(self, r): | |
r.headers['X-API-Version'] = API_VERSION | |
r.headers['X-APP_VERSION'] = APP_VERSION | |
r.headers['X-APP-Build'] = APP_BUILD | |
r.headers['x-app-za'] = APP_ZA | |
r.headers['X-UDID'] = UUID | |
r.headers['User-Agent'] = UA | |
if self._token is None: | |
auth_str = 'oauth {client_id}'.format( | |
client_id=CLIENT_ID | |
) | |
else: | |
auth_str = '{type} {token}'.format( | |
type=str(self._token.type.capitalize()), | |
token=str(self._token.token) | |
) | |
r.headers['Authorization'] = auth_str | |
return r | |
class ZhihuToken: | |
def __init__(self, user_id, uid, access_token, expires_in, token_type, | |
refresh_token, cookie, lock_in=None, unlock_ticket=None): | |
self.create_at = time.time() | |
self.user_id = uid | |
self.uid = user_id | |
self.access_token = access_token | |
self.expires_in = expires_in | |
self.expires_at = self._create_at + self._expires_in | |
self.token_type = token_type | |
self.refresh_token = refresh_token | |
self.cookie = cookie | |
# Not used | |
self._lock_in = lock_in | |
self._unlock_ticket = unlock_ticket | |
@classmethod | |
def from_file(cls, filename): | |
with open(filename) as f: | |
return cls.from_dict(json.load(f)) | |
@staticmethod | |
def save_file(filename, data): | |
with open(filename, 'w') as f: | |
json.dump(data, f) | |
@classmethod | |
def from_dict(cls, json_dict): | |
try: | |
return cls(**json_dict) | |
except TypeError: | |
raise ValueError( | |
'"{json_dict}" is NOT a valid zhihu token json.'.format( | |
json_dict=json_dict | |
)) | |
class ZhihuClient: | |
def __init__(self, username=None, passwd=None, token_file=TOKEN_FILE): | |
self._session = requests.session() | |
self._session.verify = False | |
self.username = username | |
self.passwd = passwd | |
if os.path.exists(token_file): | |
self._token = ZhihuToken.from_file(token_file) | |
else: | |
self._login_auth = ZhihuOAuth() | |
json_dict = self.login() | |
ZhihuToken.save_file(token_file, json_dict) | |
self._session.auth = ZhihuOAuth(self._token) | |
def login(self): | |
data = LOGIN_DATA.copy() | |
data['username'] = self.username | |
data['password'] = self.passwd | |
gen_login_signature(data) | |
if self.need_captcha(): | |
captcha_image = self.get_captcha() | |
with open(CAPTCHA_FILE, 'wb') as f: | |
f.write(captcha_image) | |
print('Please open {0} for captcha'.format( | |
os.path.abspath(CAPTCHA_FILE))) | |
captcha = input('captcha: ') | |
os.remove(os.path.abspath(CAPTCHA_FILE)) | |
res = self._session.post( | |
CAPTCHA_URL, | |
auth=self._login_auth, | |
data={'input_text': captcha} | |
) | |
try: | |
json_dict = res.json() | |
if 'error' in json_dict: | |
raise LoginException(json_dict['error']['message']) | |
except (ValueError, KeyError) as e: | |
raise LoginException('Maybe input wrong captcha value') | |
res = self._session.post(LOGIN_URL, auth=self._login_auth, data=data) | |
try: | |
json_dict = res.json() | |
if 'error' in json_dict: | |
raise LoginException(json_dict['error']['message']) | |
self._token = ZhihuToken.from_dict(json_dict) | |
return json_dict | |
except (ValueError, KeyError) as e: | |
raise LoginException(str(e)) | |
def need_captcha(self): | |
res = self._session.get(CAPTCHA_URL, auth=self._login_auth) | |
try: | |
j = res.json() | |
return j['show_captcha'] | |
except KeyError: | |
raise LoginException('Show captcha fail!') | |
if __name__ == '__main__': | |
client = ZhihuClient('YOUR_USERNAME', 'YOUR_PASSWORD') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
API_VERSION = '3.0.42' | |
APP_VERSION = '3.28.0' | |
APP_BUILD = 'release' | |
UUID = 'AJDA7XkI9glLBWc85sk-nJ_6F0jqALu4AlY=' | |
UA = 'osee2unifiedRelease/3.28.0 (iPhone; iOS 10.2; Scale/2.00)' | |
APP_ZA = 'OS=iOS&Release=10.2&Model=iPhone8,1&VersionName=3.28.0&VersionCode=558&Width=750&Height=' | |
CLIENT_ID = '8d5227e0aaaa4797a763ac64e0c3b8' | |
APP_SECRET = b'ecbefbf6b17e47ecb9035107866380' | |
TOKEN_FILE = 'token.json' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LoginException(Exception): | |
def __init__(self, error): | |
self.error = error | |
def __repr__(self): | |
return 'Login Fail: {}'.format(self.error) | |
__str__ = __repr__ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ZHIHU_API_ROOT = 'https://api.zhihu.com' | |
LOGIN_URL = ZHIHU_API_ROOT + '/sign_in' | |
CAPTCHA_URL = ZHIHU_API_ROOT + '/captcha' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import hashlib | |
import hmac | |
import time | |
from config import APP_SECRET | |
def gen_login_signature(data): | |
data['timestamp'] = str(int(time.time())) | |
params = ''.join([ | |
data['grant_type'], | |
data['client_id'], | |
data['source'], | |
data['timestamp'], | |
]) | |
data['signature'] = hmac.new( | |
APP_SECRET, params.encode('utf-8'), hashlib.sha1).hexdigest() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment