Created
October 10, 2015 06:46
-
-
Save messense/2dc47770f24892434340 to your computer and use it in GitHub Desktop.
模拟登录淘宝
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import re | |
import json | |
import pickle | |
import logging | |
import logging.config | |
import requests | |
from rk import RClient | |
logger = logging.getLogger(__name__) | |
class SimTaoBao(object): | |
# 登录地址 | |
taobao_login_url = "https://login.taobao.com/member/login.jhtml" | |
taobao_check_login_url = "http://paimai.taobao.com/json/checkLogin.htm" | |
# post请求头部 | |
headers = { | |
'x-requestted-with': 'XMLHttpRequest', | |
'Accept-Language': 'zh-cn', | |
'Accept-Encoding': 'gzip, deflate', | |
'ContentType': 'application/x-www-form-urlencoded; chartset=UTF-8', | |
'DNT': 1, | |
'Cache-Control': 'no-cache', | |
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:14.0) Gecko/20100101 Firefox/14.0.1', # NOQA | |
'Referer': 'https://login.taobao.com/member/login.jhtml?redirectURL=http%3A%2F%2Fwww.taobao.com%2F', # NOQA | |
'Connection': 'Keep-Alive' | |
} | |
# 请求数据包 | |
post_data = { | |
'TPL_username': '', | |
'TPL_password': '', | |
"need_check_code": "false", | |
"loginsite": 0, | |
"newlogin": 1, | |
'TPL_redirect_url': '', | |
'from': 'tbTop', | |
'fc': "default", | |
'style': 'default', | |
'css_style': '', | |
'tid': '', | |
'support': '000001', | |
'CtrlVersion': '1,0,0,7', | |
'loginType': 3, | |
'minititle': '', | |
'minipara': '', | |
"umto": "NAN", | |
'pstrong': 2, | |
'llnick': '', | |
'sign': '', | |
'need_sign': '', | |
"isIgnore": '', | |
"full_redirect": '', | |
'popid': '', | |
'callback': '1', | |
'guf': '', | |
'not_duplite_str': '', | |
'need_user_id': '', | |
'poy': '', | |
'gvfdcname': 10, | |
'from_encoding': '', | |
"sub": '', | |
"allp": '', | |
'action': 'Authenticator', | |
'event_submit_do_login': 'anything', | |
'longLogin': 0 | |
} | |
def __init__(self, username, password, | |
rk_username, rk_password, | |
rk_soft_id, rk_soft_key): | |
self.username = username | |
self.password = password | |
self.post_data['TPL_username'] = self.username.encode('gbk') | |
self.post_data['TPL_password'] = self.password | |
curr_path = os.path.abspath(os.path.dirname(__file__)) | |
cookie_dir = os.path.join(curr_path, 'cookies/') | |
if not os.path.exists(cookie_dir): | |
os.mkdir(cookie_dir) | |
self.cookie_path = os.path.join( | |
cookie_dir, '{0}.cookie'.format(self.username.encode('utf-8')) | |
) | |
self.is_login = False | |
self._session = requests.Session() | |
self._session.headers = self.headers | |
self.rk_client = RClient( | |
rk_username, | |
rk_password, | |
rk_soft_id, | |
rk_soft_key | |
) | |
try: | |
self._session.cookies = self._load_cookies(self.cookie_path) | |
except Exception: | |
logger.exception( | |
'Cannot find user cookie file!{0}'.format(self.cookie_path) | |
) | |
if not self.check_login(): | |
self.login() | |
logger.info('load cookies success!') | |
@property | |
def session(self): | |
return self._session | |
def _save_cookies(self, requets_cookiejar, filename): | |
logger.debug('save cookie') | |
with open(filename, 'wb') as f: | |
pickle.dump(requets_cookiejar, f) | |
def _load_cookies(self, filename): | |
with open(filename, 'rb') as f: | |
return pickle.load(f) | |
def _get_check_code(self, url): | |
r = self._session.get(url, stream=True) | |
status = r.status_code | |
if status == 200: | |
image = r.content | |
check_code_ret = self.rk_client.create(image, 3040) | |
check_code = check_code_ret.get('Result', '') | |
if check_code == '': | |
logger.error('failed to rk check code, return:{0}'.format(check_code_ret)) | |
self.post_data["TPL_checkcode"] = check_code | |
self.post_data["need_check_code"] = "true" | |
else: | |
logger.error('failed to get check code, status:{0}'.format(status)) | |
def _handle_response(self, r): | |
login_data = { | |
's': False, | |
'c': 0, | |
'm': '', | |
} | |
status_code = r.status_code | |
if status_code == 200: | |
rsp = json.loads(r.content.decode('gbk')) | |
if rsp.get('state'): | |
logger.debug(u'get token success,user:[{0}]'.format(self.username)) | |
self.token = rsp['data'].get('token', '') | |
url = 'https://passport.alipay.com/mini_apply_st.js?site=0&token={0}&callback=stCallback6'.format(self.token) | |
st_r = self._session.get(url) | |
p = re.compile(r'{\"st\":\"(.*?)\"}') | |
ret = p.findall(st_r.content) | |
st = 0 | |
if len(ret) > 0: | |
logger.debug(u'get st success,user:[{0}]'.format(self.username)) | |
st = ret[0] | |
else: | |
logger.error(u'get st failed,user:[{0}]'.format(self.username)) | |
login_data['m'] = 'get_st_failed' | |
return login_data | |
vst_url = 'http://login.taobao.com/member/vst.htm?st={0}&TPL_username={1}'.format(st, self.username.encode('utf-8')) | |
self._session.get(vst_url) | |
self._save_cookies(self._session.cookies, self.cookie_path) | |
login_data['s'] = True | |
return login_data | |
else: | |
msg = rsp.get('message', '') | |
data = rsp.get('data', {}) | |
code = data.get('code', -1) | |
logger.error(u'login error.code:{0},msg:{1}'.format(code, msg)) | |
login_data['c'] = code | |
login_data['m'] = msg | |
if data: | |
if code in [3425, 1000]: | |
self._get_check_code(data.get('ccurl')) | |
else: | |
logger.error('no response data,rsp:{0}'.format(rsp)) | |
else: | |
logger.error('bad status code:{0}'.format(r.status_code)) | |
login_data['m'] = 'response status code:{0}'.format(status_code) | |
return login_data | |
''' 检测登录状态 ''' | |
def check_login(self): | |
r = self._session.get(self.taobao_check_login_url) | |
try: | |
rsp = json.loads(r.content) | |
except: | |
rsp = {} | |
return rsp.get('isLogin', False) | |
''' 登录taobao ''' | |
def login(self): | |
logger.info(u'User [{0}] login...'.format(self.username)) | |
try_count = 0 | |
while try_count < 5: | |
logger.debug(u'post taobao login,user:[{0}]'.format(self.username)) | |
r = self._session.post( | |
self.taobao_login_url, | |
data=self.post_data, | |
) | |
result = self._handle_response(r) | |
if result['s']: | |
self.is_login = self.check_login() | |
logger.info( | |
u'user [{0}] login {1}.'.format(self.username, 'success' if self.is_login else 'failed') | |
) | |
break | |
elif result['c'] in [3425, 1000]: | |
continue | |
else: | |
try_count += 1 | |
return result | |
if __name__ == '__main__': | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s %(levelname)8s %(lineno)-5d %(message)s') # NOQA | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
logger.setLevel(logging.DEBUG) | |
sim = SimTaoBao(u'淘宝用户名', '淘宝密码', '若快用户名', '若快密码', '若快 soft_id', '若快 soft_key') | |
sim.login() | |
print sim.check_login() | |
print sim.session |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment