Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
OAuth包,实现了sina,QQ,网易,搜狐的OAuth认证
# -*- Encoding: utf-8 -*-
import base64
import binascii
import cgi
import hashlib
import hmac
import logging
import time
import urllib
import urlparse
import uuid
from tornado.auth import OAuthMixin, _oauth10a_signature
from tornado import httpclient
from tornado import escape
from tornado.ioloop import IOLoop
class DictObject(dict):
def __getattr__(self, k):
try:
return self[k]
except KeyError:
raise AttributeError('Unknown attribute \'%s\'' % (k,))
def __setattr__(self, k, v):
self[k] = v
def __repr__(self):
return dict.__repr__(self)
def __hash__(self):
return id(self)
class User(DictObject):
pass
class BaseOAuthMixin(OAuthMixin):
_OAUTH_NO_CALLBACKS = False
def authenticate_redirect(self):
http = httpclient.AsyncHTTPClient()
http.fetch(self._oauth_request_token_url(), self.async_callback(
self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None))
def api_request(self, path, callback, access_token=None, post_args=None, **args):
url = self._OAUTH_API_URL % path
if access_token:
all_args = {}
all_args.update(args)
all_args.update(post_args or {})
method = 'POST' if post_args is not None else 'GET'
oauth = self._oauth_request_parameters(
url, access_token, all_args, method=method)
args.update(oauth)
if args:
url += '?' + urllib.urlencode(args)
callback = self.async_callback(self._on_api_request, callback)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
http.fetch(url, method='POST', body=urllib.urlencode(post_args), callback=callback)
else:
http.fetch(url, callback=callback)
def _on_api_request(self, callback, response):
if response.error:
logging.warning('Error response %s fetching %s', response.error,
response.request.url)
callback(None)
return
callback(escape.json_decode(response.body))
def _oauth_consumer_token(self):
return dict(key=self._OAUTH_CONSUMER_KEY, secret=self._OAUTH_CONSUMER_SECRET)
class SinaMixin(BaseOAuthMixin):
_OAUTH_CONSUMER_KEY = 'XXXXXXXXXXX'
_OAUTH_CONSUMER_SECRET = 'XXXXXXXXXXXXXXXXXXXXX'
_OAUTH_REQUEST_TOKEN_URL = 'http://api.t.sina.com.cn/oauth/request_token'
_OAUTH_ACCESS_TOKEN_URL = 'http://api.t.sina.com.cn/oauth/access_token'
_OAUTH_AUTHORIZE_URL = 'http://api.t.sina.com.cn/oauth/authorize'
_OAUTH_AUTHENTICATE_URL = 'http://api.t.sina.com.cn/account/verify_credentials.json'
_OAUTH_API_URL = 'http://api.t.sina.com.cn/%s.json'
def _oauth_get_user(self, access_token, callback):
callback = self.async_callback(self._parse_user_response, callback)
self.api_request('users/show/' + access_token['user_id'],
access_token=access_token, callback=callback)
def _parse_user_response(self, callback, user):
parse_user = User()
parse_user.id = user['id']
parse_user.name = user['name']
parse_user.portrait = user['profile_image_url']
parse_user.email = ''
callback(parse_user)
class QQMixin(BaseOAuthMixin):
_OAUTH_CONSUMER_KEY = 'YYYYYYYYYYYYYYYYYYY'
_OAUTH_CONSUMER_SECRET = 'YYYYYYYYYYYYYYYYYYYYY'
_OAUTH_REQUEST_TOKEN_URL = 'http://open.t.qq.com/cgi-bin/request_token'
_OAUTH_ACCESS_TOKEN_URL = 'http://open.t.qq.com/cgi-bin/access_token'
_OAUTH_AUTHORIZE_URL = 'http://open.t.qq.com/cgi-bin/authorize'
_OAUTH_AUTHENTICATE_URL = 'http://open.t.qq.com/api/user/verify?format=json'
_OAUTH_API_URL = 'http://open.t.qq.com/api/%s'
_OAUTH_VERSION = '1.0'
def authorize_redirect(self, callback_uri=None, extra_params=None):
if callback_uri and getattr(self, "_OAUTH_NO_CALLBACKS", False):
raise Exception("This service does not support oauth_callback")
http = httpclient.AsyncHTTPClient()
http.fetch(self._oauth_request_token_url(callback_uri=callback_uri,
extra_params=extra_params),
self.async_callback(
self._on_request_token,
self._OAUTH_AUTHORIZE_URL,
callback_uri))
def _oauth_request_token_url(self, callback_uri= None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
)
if callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params: args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=request_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
)
if "verifier" in request_token:
args["oauth_verifier"]=request_token["verifier"]
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
def _oauth_request_parameters(self, url, access_token, parameters={}, method="GET"):
consumer_token = self._oauth_consumer_token()
base_args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=access_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
)
args = {}
args.update(base_args)
args.update(parameters)
signature = _oauth10a_signature(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = signature
return base_args
def _oauth_get_user(self, access_token, callback):
callback = self.async_callback(self._parse_user_response, callback)
self.api_request('user/info', access_token=access_token, callback=callback, format='json')
def _parse_user_response(self, callback, user):
parse_user = User()
parse_user.id = self.get_argument('openid', None)
parse_user.name = user['data']['name']
parse_user.portrait = user['data']['head'] + '/100'
parse_user.email = user['data']['email']
callback(parse_user)
class NeteaseMixin(BaseOAuthMixin):
_OAUTH_CONSUMER_KEY = 'ZZZZZZZZZZZZZZ'
_OAUTH_CONSUMER_SECRET = 'ZZZZZZZZZZZZZZZZZZZZZZ'
_OAUTH_REQUEST_TOKEN_URL = 'http://api.t.163.com/oauth/request_token'
_OAUTH_ACCESS_TOKEN_URL = 'http://api.t.163.com/oauth/access_token'
_OAUTH_AUTHORIZE_URL = 'http://api.t.163.com/oauth/authenticate'
_OAUTH_AUTHENTICATE_URL = 'http://api.t.163.com/account/verify_credentials.json'
_OAUTH_API_URL = 'http://api.t.163.com/%s.json'
_OAUTH_VERSION = '1.0'
def _oauth_get_user(self, access_token, callback):
callback = self.async_callback(self._parse_user_response, callback)
self.api_request('users/show', access_token=access_token, callback=callback)
def _parse_user_response(self, callback, user):
parse_user = User()
parse_user.id = user['id']
parse_user.name = user['name']
parse_user.portrait = user['profile_image_url']
parse_user.email = ''
callback(parse_user)
class SohuMixin(BaseOAuthMixin):
_OAUTH_CONSUMER_KEY = '1111111111111111'
_OAUTH_CONSUMER_SECRET = '111111111111111111111111111111'
_OAUTH_REQUEST_TOKEN_URL = 'http://api.t.sohu.com/oauth/request_token'
_OAUTH_ACCESS_TOKEN_URL = 'http://api.t.sohu.com/oauth/access_token'
_OAUTH_AUTHORIZE_URL = 'http://api.t.sohu.com/oauth/authorize'
_OAUTH_AUTHENTICATE_URL = 'http://api.t.sohu.com/account/verify_credentials.json'
_OAUTH_API_URL = 'http://api.t.sohu.com/%s.json'
_OAUTH_VERSION = '1.0'
def authorize_redirect(self, callback_uri=None, extra_params=None):
if callback_uri and getattr(self, "_OAUTH_NO_CALLBACKS", False):
raise Exception("This service does not support oauth_callback")
http = httpclient.AsyncHTTPClient()
http.fetch(self._oauth_request_token_url(callback_uri=callback_uri,
extra_params=extra_params),
self.async_callback(
self._on_request_token,
self._OAUTH_AUTHORIZE_URL,
callback_uri))
def _oauth_request_token_url(self, callback_uri= None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
)
if callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
if extra_params: args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
def _oauth_access_token_url(self, request_token):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=request_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
)
if "verifier" in request_token:
args["oauth_verifier"]=request_token["verifier"]
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
def _oauth_request_parameters(self, url, access_token, parameters={}, method="GET"):
consumer_token = self._oauth_consumer_token()
base_args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=access_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0"),
)
args = {}
args.update(base_args)
args.update(parameters)
signature = _oauth10a_signature(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = signature
return base_args
def _oauth_get_user(self, access_token, callback):
callback = self.async_callback(self._parse_user_response, callback)
self.api_request('users/show',
access_token=access_token, callback=callback)
def _parse_user_response(self, callback, user):
parse_user = User()
parse_user.id = user['id']
parse_user.name = user['screen_name']
parse_user.portrait = user['profile_image_url']
parse_user.email = ''
callback(parse_user)
'''
Created on Nov 19, 2011
@author: zhuhua
'''
import app
import oauth
import tornado
from common import flash
from oauth.funcs import check_login
@app.route('^/oauth/sina')
class SinaOAuthHandler(app.BaseHandler, oauth.SinaMixin):
@tornado.web.asynchronous
def get(self):
if self.get_argument("oauth_token", None):
self.get_authenticated_user(self.async_callback(self._on_auth))
return
self.authorize_redirect(callback_uri='/oauth/sina')
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Sina auth failed")
check_login(self, user, 'sina')
@app.route('^/oauth/qq')
class QQOAuthHandler(app.BaseHandler, oauth.QQMixin):
@tornado.web.asynchronous
def get(self):
if self.get_argument("oauth_token", None):
self.get_authenticated_user(self.async_callback(self._on_auth))
return
self.authorize_redirect(callback_uri='/oauth/qq')
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "QQ auth failed")
check_login(self, user, 'qq')
@app.route('^/oauth/163')
class NeteaseOAuthHandler(app.BaseHandler, oauth.NeteaseMixin):
@tornado.web.asynchronous
def get(self):
if self.get_argument("oauth_token", None):
self.get_authenticated_user(self.async_callback(self._on_auth))
return
self.authorize_redirect(callback_uri='/oauth/163')
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "163 auth failed")
check_login(self, user, '163')
@app.route('^/oauth/sohu')
class SohuOAuthHandler(app.BaseHandler, oauth.SohuMixin):
@tornado.web.asynchronous
def get(self):
if self.get_argument("oauth_token", None):
self.get_authenticated_user(self.async_callback(self._on_auth))
return
self.authorize_redirect(callback_uri='/oauth/sohu')
def _on_auth(self, user):
if not user:
raise tornado.web.HTTPError(500, "Sohu auth failed")
check_login(self, user, 'sohu')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.