Skip to content

Instantly share code, notes, and snippets.

@yszou
Created Dec 9, 2013
Embed
What would you like to do?
新浪的SSO授权登录
# -*- coding: utf-8 -*-
import rsa
import re
import time
import json
import traceback
from urllib import urlencode, unquote, quote
from Cookie import SimpleCookie
import tornado
import tornado.ioloop
import tornado.httpclient
import tornado.gen
class Client(object):
def __init__(self):
self.client = tornado.httpclient.AsyncHTTPClient()
self.with_cookie = {}
def wrap(self, func):
def inner(response):
if not response.error:
cookie = response.headers.get('Set-Cookie', '')
if cookie:
pair = [x.split(';')[0] for x in cookie.split(',')]
pair = [x.split('=', 1) for x in pair if '=' in x]
pair = ['%s=%s' % tuple(x) for x in pair]
cookie = '; '.join(pair)
func(response)
return inner
def open(self, url, params={}, method='GET', callback=None, **kargs):
params = params.copy()
if '?' in url:
pair = [x.split('=') for x in url.split('?', 1)[1].split('&')]
for k, v in pair:
params[k] = unquote(v)
url = url.split('?', 1)[0]
body = None
if method == 'GET':
if params:
url = url + '?' + urlencode(params)
else:
body = urlencode(params)
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux i686; rv:16.0) Gecko/20100101 Firefox/16.0',
'Connection': 'close',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-cn,zh;q=0.8,en-us;q=0.5,ja;q=0.3',
'Accept': '*/*',
}
if self.with_cookie:
pair = ['%s=%s' % (k, v) for k, v in self.with_cookie.items()]
cookie = '; '.join(pair)
headers['Cookie'] = cookie
if 'headers' in kargs:
headers.update(kargs.pop('headers'))
self.client.fetch(url, body=body, method=method, headers=headers,
callback=self.wrap(callback), **kargs)
class NavigatorPage(object):
def __init__(self, url):
self.client = Client()
self.url = url
@tornado.gen.engine
def find_login_url(self, callback):
res = yield tornado.gen.Task(self.client.open, self.url)
res = yield tornado.gen.Task(self.client.open, 'http://cloudbbs.org/xwb.php?m=xwbAuth.login', follow_redirects=False)
cookie = res.headers.get('Set-Cookie', '')
if cookie:
pair = [x.split(';')[0] for x in cookie.split(',')]
pair = [x.split('=', 1) for x in pair if '=' in x]
self.client.with_cookie.update(dict(pair))
callback(res.headers['Location'])
@tornado.gen.engine
def login(self, url, callback):
res = yield tornado.gen.Task(self.client.open, url)
cookie = res.headers.get('Set-Cookie', '')
if cookie:
pair = [x.split(';')[0] for x in cookie.split(',')]
pair = [x.split('=', 1) for x in pair if '=' in x]
self.client.with_cookie.update(dict(pair))
callback(self)
def open(self, *args, **kargs):
self.client.open(*args, **kargs)
@tornado.gen.engine
def reply(self, fid, thread, post, msg, callback):
if post:
url = self.url + '?' + ('mod=post&action=reply&fid=%s&tid=%s&reppost=%s' % (fid, thread, post))
else:
url = self.url + '?' + ('mod=post&action=reply&fid=%s&tid=%s' % (fid, thread))
res = yield tornado.gen.Task(self.client.open, url)
formhash = re.findall('name="formhash".*?value="(.*?)"', res.body)[0]
posttime = re.findall('name="posttime".*?value="(.*?)"', res.body)[0]
noticeauthor = ''; noticeauthormsg = ''; reppid = ''; reppost = ''
if post:
try:
noticeauthor = re.findall('name="noticeauthor".*?value="(.*?)"', res.body)[0]
noticeauthormsg = re.findall('name="noticeauthormsg".*?value="(.*?)"', res.body, re.S)[0]
reppid = re.findall('name="reppid".*?value="(.*?)"', res.body)[0]
reppost = re.findall('name="reppost".*?value="(.*?)"', res.body)[0]
except:
traceback.print_exc()
callback((0, 2))
return
p = {
'checkbox': '0',
'formhash': formhash,
'message': msg,
'noticeauthor': noticeauthor,
'noticeauthormsg': noticeauthormsg,
'noticestrimstr': '',
'posttime': posttime,
'reppid': reppid,
'reppost': reppost,
'save': '',
'subject': '',
'usesig': '0',
'wysiwyg': '0',
}
url += '&replysubmit=yes'
res = yield tornado.gen.Task(self.client.open, url, p, 'POST', follow_redirects=False)
if res.code == 301:
redirect = res.headers.get('Location', '')
if redirect:
pid = re.findall('pid=(\d*?)', redirect)[0]
callback((pid, 0))
else:
callback((0, 0))
else:
callback((0, 1))
class LoginPage(object):
def __init__(self):
self.client = Client()
self.appkey = ''
self.nonce = ''
self.rsakv = ''
self.exectime = ''
self.servertime = ''
self.showpin = ''
self.pubkey = ''
self.sp = ''
self.su = ''
self.state = ''
self.redirect_uri = ''
self.response_type = ''
self.client_id = ''
self.userId = ''
self.uid = ''
self.ticket = ''
self.cookie = ''
@tornado.gen.engine
def open(self, url, callback):
self.url = url
pair = [x.split('=') for x in url.split('?', 1)[1].split('&')]
pair = dict(pair)
for k, v in pair.items():
pair[k] = unquote(v)
self.state = pair['state']
self.redirect_uri = pair['redirect_uri']
self.response_type = pair['response_type']
self.client_id = pair['client_id']
res = yield tornado.gen.Task(self.client.open, self.url)
self.appkey = re.findall('name="appkey62" value="(.*?)"', res.body)[0]
callback(res.body)
@tornado.gen.engine
def get_oauth2web_js(self, callback):
url = 'https://api.weibo.com/oauth2/js/oauth2Web.js'
res = yield tornado.gen.Task(self.client.open, url)
callback(res.body)
@tornado.gen.engine
def get_ssologin_js(self, callback):
url = 'https://api.weibo.com/oauth2/js/sso/ssologin.js'
res = yield tornado.gen.Task(self.client.open, url)
callback(res.body)
@tornado.gen.engine
def prelogin(self, username, callback):
url = 'https://login.sina.com.cn/sso/prelogin.php'
p = {
'entry': 'openapi',
'callback': 'sinaSSOController.preloginCallBack',
'su': quote(username).encode('base64').rstrip(),
'rsakt': 'mod',
'checkpin': '1',
'client': 'ssologin.js(v1.4.5)',
'_': int(time.time()) * 1000,
}
headers = {
'Referer': self.url,
}
res = yield tornado.gen.Task(self.client.open, url, p, headers=headers)
data = re.findall('sinaSSOController.preloginCallBack\((.*?)\)', res.body)[0]
data = json.loads(data)
self.userId = username
self.nonce = data['nonce']
self.rsakv = data['rsakv']
self.exectime = data['exectime']
self.servertime = data['servertime']
self.showpin = data['showpin']
self.pubkey = data['pubkey']
cookie = res.headers.get('Set-Cookie', '')
if cookie:
pair = [x.split(';')[0] for x in cookie.split(',')]
pair = [x.split('=', 1) for x in pair if '=' in x]
pair = ['%s=%s' % tuple(x) for x in pair]
cookie = '; '.join(pair)
self.cookie = cookie
callback(res.body)
@tornado.gen.engine
def login(self, callback):
url = 'https://login.sina.com.cn/sso/login.php'
p = {
'_': int(time.time()) * 1000,
'appkey': self.appkey,
'callback': 'sinaSSOController.loginCallBack',
'cdult': '2',
'client': 'ssologin.js(v1.4.5)',
'ct': '1800',
'domain': 'weibo.com',
'door': '',
'encoding': 'UTF-8',
'entry': 'openapi',
'from': '',
'gateway': '1',
'nonce': self.nonce,
'pagerefer': 'http://cloudbbs.org/forum.php',
'prelt': '80',
'pwencode': 'rsa2',
'returntype': 'TEXT',
'rsakv': self.rsakv,
's': '1',
'savestate': '0',
'servertime': self.servertime,
'service': 'miniblog',
'sp': self.sp,
'su': self.su,
'useticket': '1',
'vsnf': '1',
'vsnval': '',
}
headers = {
'Cookie': self.cookie,
'Referer': self.url,
}
res = yield tornado.gen.Task(self.client.open, url, p, headers=headers)
data = re.findall('sinaSSOController.loginCallBack\((.*?)\)', res.body)[0]
data = json.loads(data)
self.uid = data['uid']
self.ticket = data['ticket']
cookie = res.headers.get('Set-Cookie', '')
if cookie:
pair = [x.split(';')[0] for x in cookie.split(',')]
pair = [x.split('=', 1) for x in pair if '=' in x]
pair = ['%s=%s' % tuple(x) for x in pair]
cookie = '; '.join(pair)
self.cookie = cookie
callback(res.body)
@tornado.gen.engine
def auth(self, callback):
url = 'https://api.weibo.com/oauth2/authorize'
p = {
'action': 'login',
'appkey62': self.appkey,
'client_id': self.client_id,
'display': 'default',
'from': '',
'isLoginSina': '',
'passwd': '',
'redirect_uri': self.redirect_uri,
'regCallback': 'https%3A%2F%2Fapi.weibo.com%2F2%2Foauth2%2Fauthorize%3Fclient_id%3D2956016099%26response_type%3Dcode%26display%3Ddefault%26redirect_uri%3Dhttp%3A%2F%2Fcloudbbs.org%2Fxwb.php%3Fm%3DxwbAuth.authCallBack%26from%3D%26with_cookie%3D',
'response_type': self.response_type,
'scope': '',
'state': self.state,
'ticket': self.ticket,
'userId': self.userId,
'verifyToken': 'null',
'withOfficalAccount': '',
'withOfficalFlag': '0',
}
headers = {
'Cookie': self.cookie,
'Referer': self.url,
}
res = yield tornado.gen.Task(self.client.open, url, p, method="POST", headers=headers, follow_redirects=False)
callback(res.headers['Location'])
@tornado.gen.engine
def reply(url, username, password, fid, thread, post, msg, callback):
app = NavigatorPage(url)
login_url = yield tornado.gen.Task(app.find_login_url)
page = LoginPage()
yield tornado.gen.Task(page.open, login_url)
yield tornado.gen.Task(page.prelogin, username)
message = '\t'.join([str(page.servertime), str(page.nonce)]) + '\n' + str(password)
pubkey = rsa.PublicKey(int(page.pubkey, 16), int('10001', 16))
sp = rsa.encrypt(message, pubkey).encode('hex')
page.sp = sp
page.su = quote(username).encode('base64').rstrip()
yield tornado.gen.Task(page.login)
redirect = yield tornado.gen.Task(page.auth)
app = yield tornado.gen.Task(app.login, redirect)
pid, error = yield tornado.gen.Task(app.reply, fid, thread, post, msg)
callback((pid, error))
if __name__ == '__main__':
IL = tornado.ioloop.IOLoop.instance()
@tornado.gen.engine
def main(username, password):
app = NavigatorPage('http://cloudbbs.org/forum.php')
login_url = yield tornado.gen.Task(app.find_login_url)
page = LoginPage()
yield tornado.gen.Task(page.open, login_url)
#yield tornado.gen.Task(page.get_oauth2web_js)
#yield tornado.gen.Task(page.get_ssologin_js)
yield tornado.gen.Task(page.prelogin, username)
message = '\t'.join([str(page.servertime), str(page.nonce)]) + '\n' + str(password)
pubkey = rsa.PublicKey(int(page.pubkey, 16), int('10001', 16))
sp = rsa.encrypt(message, pubkey).encode('hex')
page.sp = sp
page.su = quote(username).encode('base64').rstrip()
yield tornado.gen.Task(page.login)
redirect = yield tornado.gen.Task(page.auth)
app = yield tornado.gen.Task(app.login, redirect)
res = yield tornado.gen.Task(app.reply, 37, 19294, 0, '[color=red]不错[/color]')
main('xxx@gmail.com', '')
IL.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment