Skip to content

Instantly share code, notes, and snippets.

@1dot75cm
Forked from mrluanma/requirements.txt
Last active October 28, 2016 15:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 1dot75cm/8ff5266c4f7aa158c2b0a366d21d8bb8 to your computer and use it in GitHub Desktop.
Save 1dot75cm/8ff5266c4f7aa158c2b0a366d21d8bb8 to your computer and use it in GitHub Desktop.
Python 登录新浪微博 (requests 真的比 urllib2 强了 2^^32 倍,兼容 py2/3)
future==0.16.0
requests==2.11.1
rsa==3.4.2
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
from future import standard_library, utils
standard_library.install_aliases()
from builtins import int, input, open, zip, object
from lxml import etree
import re
import rsa
import time
import json
import base64
import binascii
import requests
import argparse
import logging
class WeiboLogin(object):
'''新浪微博登录'''
WBCLIENT = 'ssologin.js(v1.4.18)'
user_agent = (
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.11 (KHTML, like Gecko) '
'Chrome/55.0.2883.21 Safari/536.11')
pre_login_url = ('http://login.sina.com.cn/sso/prelogin.php?entry=weibo&'
'callback=sinaSSOController.preloginCallBack&rsakt=mod&su=%s&client=%s')
login_ticket_url = 'http://login.sina.com.cn/sso/login.php?client=%s'
post_login_url = ('http://weibo.com/ajaxlogin.php?framelogin=1&retcode=0&'
'callback=parent.sinaSSOController.feedBackUrlCallBack&ticket=%s')
search_url = 'http://s.weibo.com/weibo/%s&page=%s'
search_user_url = 'http://s.weibo.com/user/%s&auth=%s&page=%s'
def __init__(self):
'''实例化对象'''
self.session = requests.session() # session会保存cookie
self.session.headers['User-Agent'] = self.user_agent
self.get = self.session.get
self.post = self.session.post
self.username, self.nick = '', ''
self.userid, self.status = '', ''
self.error = ''
#password = RSAKey.encrypt([me.servertime, me.nonce].join("\t") +"\n"+ password)
def _encrypt_passwd(self, passwd, pubkey, servertime, nonce):
'''加密密码,返回 sp'''
key = rsa.PublicKey(int(pubkey, 16), int('10001', 16)) # 创建公钥
message = str(servertime) +'\t'+ str(nonce) +'\n'+ str(passwd)
passwd = rsa.encrypt(message.encode('utf8'), key) # 加密
return binascii.b2a_hex(passwd) # 转换为16进制
def _pre_login(self, username):
'''预登陆, 获取 servertime, nonce, pubkey, rsakv, pcid'''
resp = self.get(self.pre_login_url % (
base64.b64encode(username.encode('utf-8')),
self.WBCLIENT))
pre_login_str = re.match(r'.+({.+?})', resp.text).group(1)
return json.loads(pre_login_str)
def _get_login_ticket(self, username, password, pre_login):
'''登录, 获取 retcode, ticket, uid, nick'''
param = {
'entry': 'weibo',
'gateway': 1,
'from': '',
'savestate': 0,
'useticket': 1,
'vsnf': 1,
'su': base64.b64encode(requests.utils.quote(username).encode('utf-8')),
'service': 'miniblog',
'servertime': pre_login['servertime'],
'nonce': pre_login['nonce'],
'pwencode': 'rsa2',
'rsakv': pre_login['rsakv'],
'sp': self._encrypt_passwd(password, pre_login['pubkey'],
pre_login['servertime'], pre_login['nonce']),
'encoding': 'UTF-8',
'domain': 'weibo.com',
'prelt': 115,
'returntype': 'TEXT' # 返回数据类型 TEXT, META
}
resp = self.post(self.login_ticket_url % self.WBCLIENT, param)
return json.loads(resp.text)
def _post_login(self, login):
'''验证登录 ticket, 完成登录'''
if login['retcode'] == '0':
resp = self.get(self.post_login_url % login['ticket'])
login_str = re.search(r'\(({.+?})\)', resp.text).group(1)
login.update(json.loads(login_str))
return login
def login(self, username, password):
'''微博登录'''
pre_login = self._pre_login(username)
login = self._get_login_ticket(username, password, pre_login)
login = self._post_login(login)
logging.debug(' Login profile: %s' % json.dumps(login, ensure_ascii=False))
self.username = username
self.status = login['retcode']
if self.status == '0':
self.nick = login['nick']
self.userid = login['uid']
print('登录成功!欢迎您,%s。' % self.nick)
else:
self.error = login['reason']
print('%s。错误代码:%s' % (self.error, self.status))
exit(1)
def search(self, query, search_type='user', user_type='org', page=1):
'''微博搜索'''
user_dict = {
'org': 'org_vip', # 机构
'person': 'per_vip', # 个人
'user': 'ord' # 普通用户
}
url = self.search_url % (requests.utils.quote(query), page)
if search_type == 'user':
url = self.search_user_url % (
requests.utils.quote(query), user_dict[user_type], page)
# 设置每页项目数
self.session.headers['Origin'] = 'http://s.weibo.com'
self.session.headers['Referer'] = 'http://s.weibo.com/preferences'
resp = self.post('http://s.weibo.com/ajax/preferences',
data={'page_num': 30, '_t': 0})
logging.debug('设置项数响应: '+resp.text)
# 开始搜索
logging.debug('page: '+str(page))
resp = self.get(url)
if search_type == 'user':
html = re.search('.*pl_user_feedList".*', resp.text).group()
js = json.loads(re.search('{.*}', html).group())
tree = etree.HTML(js['html'])
names = tree.xpath('//p[@class="person_name"]/a[1]/@title')
hrefs = tree.xpath('//p[@class="person_name"]/a[1]/@href')
addrs = tree.xpath('//p[@class="person_addr"]/span[2]/text()')
fans = tree.xpath('//p[@class="person_num"]/span[2]/a/text()')
search_str = tree.xpath('//div[@class="search_num"]/span/text()')[0]
search_num = re.search('找到([0-9]+)条结果', search_str).group(1)
results = list(zip(names, hrefs, addrs, fans))
else:
html = re.search('.*pl_weibo_direct".*', resp.text).group()
js = json.loads(re.search('{.*}', html).group())
tree = etree.HTML(js['html'])
names = tree.xpath('//div/a[@class="W_texta W_fb"]/@title')
hrefs = tree.xpath('//div/a[@class="W_texta W_fb"]/@href')
contents = tree.xpath('//p[@class="comment_txt"]')
contents = [' '.join(e.xpath('string()').split()) for e in contents]
times = tree.xpath('//div/a[@class="W_textb"]/text()')
search_str = tree.xpath('//div[@class="search_rese clearfix"]/span/text()')[0]
search_num = re.search('找到([0-9]+)条结果', search_str).group(1)
results = list(zip(names, hrefs, contents, times))
return {
'per_page': len(names),
'page': int(page),
'pages': (int(search_num)//len(names)+1),
'total': int(search_num),
'results': results
}
def _input(string):
return input(string.encode('utf8')) if utils.PY2 \
else input(string) # py3
def parse_args():
'''Parser for command-line options'''
parser = argparse.ArgumentParser(description='新浪微博登录示例')
parser.add_argument('-u', '--username', metavar='USER', type=str, dest='username',
action='store', required=True, help='微博用户名')
parser.add_argument('-p', '--password', metavar='PASS', type=str, dest='password',
action='store', required=True, help='微博密码')
parser.add_argument('-o', '--output', metavar='FILE', type=str, dest='file',
action='store', required=False, help='输出文件名')
parser.add_argument('-q', '--query', metavar='QUERY', type=str, dest='query',
action='store', required=False, default='书店'.encode('utf8'), help='微博搜索内容')
parser.add_argument('--page', metavar='PAGE', type=int, dest='page',
action='store', required=False, default=1, help='搜索页数')
parser.add_argument('--search_type', metavar='TYPE', type=str, dest='search_type',
action='store', required=False, default='user', help='微博搜索类型[all|user]')
parser.add_argument('-i', '--interactive', dest='interactive', action='store_true', help='交互式')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='调试信息')
return parser.parse_args()
def file_output(weibo, args):
total = 0
while True:
js = weibo.search(args.query, search_type=args.search_type, page=args.page)
with open(args.file, 'a+') as fp:
for i,j,k,l in js['results']:
logging.debug('%s %s %s %s' % (i,j,k,l))
try:
fp.write('%s,%s,%s,%s\n' % (i,j,k,l)) # unicode
except:
fp.write(b'%s,%s,%s,%s\n' % (i.encode('utf8'), j.encode('utf8'),
k.encode('utf8'), l.encode('utf8'))) # 兼容 py2, unicode -> bytes
args.page += 1
total += js['per_page']
logging.info('每页项数: %d' % js['per_page'])
logging.info('页数/总数: %d / %d' % (js['page'], js['pages']))
logging.info('已获取项数: %d' % total)
logging.info('项目总数: %d' % js['total'])
if total >= js['total']:
break
time.sleep(10)
def interactive(weibo, args):
while True:
args.search_type = _input('搜索类型[user|all]: ') or args.search_type
args.query = _input('搜索内容: ') or args.query
args.page = _input('页数: ') or args.page
js = weibo.search(args.query, search_type=args.search_type, page=args.page)
for i,j,k,l in js['results']:
print('-> %s (%s)\n %s\n %s\n' % (i,j,k,l))
print('每页项数: %d' % js['per_page'])
print('页数/总数: %d / %d' % (js['page'], js['pages']))
print('搜索结果: %d' % js['total'])
if __name__ == '__main__':
args = parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG) # 日志
logging.debug('Arguments: %s' % args)
if args.interactive:
args.username = args.username if args.username else _input('用户名: ')
args.password = args.password if args.password else _input('密码: ')
me = WeiboLogin()
me.login(args.username, args.password)
if args.interactive:
interactive(me, args)
if args.file:
file_output(me, args)
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
from future import standard_library, utils
standard_library.install_aliases()
from builtins import str, int, input, zip, object
from lxml import etree
from future.moves.urllib import request
from urllib.parse import urlencode
from http.cookiejar import LWPCookieJar as CookieJar
import re
import rsa
import json
import time
import base64
import binascii
import logging
logging.basicConfig(level=logging.DEBUG) # 日志
class WeiboLogin(object):
'''新浪微博登录'''
WBCLIENT = 'ssologin.js(v1.4.18)'
user_agent = (
'Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.11 (KHTML, like Gecko) '
'Chrome/55.0.2883.21 Safari/536.11')
headers = {'User-Agent': user_agent}
pre_login_url = ('http://login.sina.com.cn/sso/prelogin.php?entry=weibo&'
'callback=sinaSSOController.preloginCallBack&rsakt=mod&su=%s&client=%s')
login_ticket_url = 'http://login.sina.com.cn/sso/login.php?client=%s'
post_login_url = ('http://weibo.com/ajaxlogin.php?framelogin=1&retcode=0&'
'callback=parent.sinaSSOController.feedBackUrlCallBack&ticket=%s')
search_url = 'http://s.weibo.com/weibo/%s&page=%s'
search_user_url = 'http://s.weibo.com/user/%s&auth=%s&page=%s'
def __init__(self):
# 保存 Cookie
cj = CookieJar() # cookie对象
cookie_support = request.HTTPCookieProcessor(cj) # HTTPCookie 处理器绑定 cookie对象
opener = request.build_opener(cookie_support, request.HTTPHandler) # 设置 handler 处理请求
request.install_opener(opener) # 安装 opener
self.username, self.nick = '', ''
self.userid, self.status = '', ''
self.error = ''
def get(self, url):
'''封装 HTTP GET 方法'''
logging.info(' Starting HTTP connection: %s' % url.split('/')[2])
req = request.Request(url, headers=self.headers)
resp = request.urlopen(req)
logging.debug(' "GET /%s" %s' % (resp.url.split('/', 3)[3], resp.code))
return resp.read().decode('utf8')
def post(self, url, data):
'''封装 HTTP POST 方法'''
logging.info(' Starting HTTP connection: %s' % url.split('/')[2])
data = _urlencode(data) # urlencode将请求对象用'&'字符连接
req = request.Request(url, data=data, headers=self.headers)
resp = request.urlopen(req)
logging.debug(' "POST /%s" %s' % (resp.url.split('/', 3)[3], resp.code))
return resp.read().decode('utf8')
def _encrypt_passwd(self, passwd, pubkey, servertime, nonce):
'''加密密码,返回 sp'''
key = rsa.PublicKey(int(pubkey, 16), int('10001', 16)) # 创建公钥
message = str(servertime) +'\t'+ str(nonce) +'\n'+ str(passwd)
passwd = rsa.encrypt(message.encode('utf-8'), key) # 加密
return binascii.b2a_hex(passwd) # 转换为16进制
def _pre_login(self, username):
'''预登陆, 获取 servertime, nonce, pubkey, rsakv, pcid'''
resp = self.get(self.pre_login_url % (username, self.WBCLIENT))
pre_login_str = re.match(r'.+({.+?})', resp).group(1)
return json.loads(pre_login_str)
def _get_login_ticket(self, username, password, pre_login):
'''登录, 获取 retcode, ticket, uid, nick'''
param = {
'entry': 'weibo',
'gateway': 1,
'from': '',
'savestate': 0,
'useticket': 1,
'vsnf': 1,
'su': base64.b64encode(request.quote(username).encode('utf-8')),
'service': 'miniblog',
'servertime': pre_login['servertime'],
'nonce': pre_login['nonce'],
'pwencode': 'rsa2',
'rsakv': pre_login['rsakv'],
'sp': self._encrypt_passwd(password, pre_login['pubkey'],
pre_login['servertime'], pre_login['nonce']),
'encoding': 'UTF-8',
'domain': 'weibo.com',
'prelt': 115,
'returntype': 'TEXT'
}
resp = self.post(self.login_ticket_url % self.WBCLIENT, param)
return json.loads(resp)
def _post_login(self, login):
'''验证登录 ticket, 完成登录'''
if login['retcode'] == '0':
resp = self.get(self.post_login_url % login['ticket'])
login_str = re.search(r'\(({.+?})\)', resp).group(1)
login.update(json.loads(login_str))
return login
def login(self, username, password):
'''微博登录'''
pre_login = self._pre_login(username)
login = self._get_login_ticket(username, password, pre_login)
login = self._post_login(login)
logging.debug(' Login profile: %s' % json.dumps(login))
self.username = username
self.status = login['retcode']
if self.status == '0':
self.nick = login['nick']
self.userid = login['uid']
print('登录成功!欢迎您,%s。' % self.nick)
else:
self.error = login['reason']
print('%s。错误代码:%s' % (self.error, self.status))
exit(1)
def search(self, query, search_type='user', user_type='org', page=1):
'''微博搜索'''
user_dict = {
'org': 'org_vip', # 机构
'person': 'per_vip', # 个人
'user': 'ord' # 普通用户
}
url = self.search_url % (request.quote(query), page)
if search_type == 'user':
url = self.search_user_url % (
request.quote(query), user_dict[user_type], page)
# 设置每页项目数
self.headers['Origin'] = 'http://s.weibo.com'
self.headers['Referer'] = 'http://s.weibo.com/preferences'
self.headers['X-Requested-With'] = 'XMLHttpRequest'
resp = self.post('http://s.weibo.com/ajax/preferences',
data={'page_num': 30, '_t': 0})
logging.debug('设置项数响应: '+resp)
# 开始搜索
logging.debug('page: '+str(page))
resp = self.get(url)
if search_type == 'user':
html = re.search('.*pl_user_feedList".*', resp).group()
js = json.loads(re.search('{.*}', html).group())
tree = etree.HTML(js['html'])
names = tree.xpath('//p[@class="person_name"]/a[1]/@title')
hrefs = tree.xpath('//p[@class="person_name"]/a[1]/@href')
addrs = tree.xpath('//p[@class="person_addr"]/span[2]/text()')
fans = tree.xpath('//p[@class="person_num"]/span[2]/a/text()')
search_str = tree.xpath('//div[@class="search_num"]/span/text()')[0]
search_num = re.search('找到([0-9]+)条结果', search_str).group(1)
results = list(zip(names, hrefs, addrs, fans))
else:
html = re.search('.*pl_weibo_direct".*', resp).group()
js = json.loads(re.search('{.*}', html).group())
tree = etree.HTML(js['html'])
names = tree.xpath('//div/a[@class="W_texta W_fb"]/@title')
hrefs = tree.xpath('//div/a[@class="W_texta W_fb"]/@href')
contents = tree.xpath('//p[@class="comment_txt"]')
contents = [' '.join(e.xpath('string()').split()) for e in contents]
times = tree.xpath('//div/a[@class="W_textb"]/text()')
search_str = tree.xpath('//div[@class="search_rese clearfix"]/span/text()')[0]
search_num = re.search('找到([0-9]+)条结果', search_str).group(1)
results = list(zip(names, hrefs, contents, times))
return {
'per_page': len(names),
'page': int(page),
'pages': (int(search_num)//len(names)+1),
'total': int(search_num),
'results': results
}
def _input(string):
return input(string.encode('utf8')) if utils.PY2 \
else input(string) # py3
def _urlencode(string):
return urlencode(string) if utils.PY2 \
else urlencode(string).encode('utf8') # py3
if __name__ == '__main__':
username = _input('用户名: ')
password = _input('密码: ')
me = WeiboLogin()
me.login(username, password)
st2, q2 = 'user', '书店'
while True:
st = _input('搜索类型[user|all]: ') or st2
q = _input('搜索内容: ') or q2.encode('utf8')
page = _input('页数: ') or 1
st2, q2 = st, q
js = me.search(q, search_type=st, page=page)
for i,j,k,l in js['results']:
print('-> %s (%s)\n %s\n %s\n' % (i,j,k,l))
print('每页项数: %d' % js['per_page'])
print('页数/总数: %d / %d' % (js['page'], js['pages']))
print('搜索结果: %d' % js['total'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment