Created
November 20, 2011 15:39
-
-
Save fanzeyi/1380373 to your computer and use it in GitHub Desktop.
Fanfou OAuth SDK for Google App Engine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class OAuthHandler(webapp.RequestHandler): | |
def get(self): | |
query_string = cgi.parse_qs(self.request.query_string, keep_blank_values = False) | |
oauth = FanfouOAuth(CONSUMER_KEY, CONSUMER_SECRET) | |
if 'oauth_token' in query_string: | |
self.response.out.write(oauth.get_access_token(self, query_string['oauth_token'][0])) | |
return | |
oauth.authenticate_redirect(self) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import uuid | |
import hmac | |
import time | |
import base64 | |
import urllib | |
import hashlib | |
import binascii | |
import urlparse | |
import logging | |
from cookies import Cookies | |
from google.appengine.api import urlfetch | |
try: | |
from urlparse import parse_qs # Python 2.6+ | |
except ImportError: | |
from cgi import parse_qs | |
if str is unicode: | |
def b(s): | |
return s.encode('latin1') | |
bytes_type = bytes | |
else: | |
def b(s): | |
return s | |
bytes_type = str | |
def _oauth_escape(val): | |
if isinstance(val, unicode): | |
val = val.encode("utf-8") | |
return urllib.quote(val, safe="~") | |
_UTF8_TYPES = (bytes, type(None)) | |
def escape_utf8(value): | |
if isinstance(value, _UTF8_TYPES): | |
return value | |
# assert isinstance(value, unicode) | |
return value.encode("utf-8") | |
def _signature_request(consumer_token, method, url, args = {}, token = None): | |
parts = urlparse.urlparse(url) | |
scheme, netloc, path = parts[:3] | |
normalized_url = scheme.lower() + "://" + netloc.lower() + path | |
base_elems = [] | |
base_elems.append(method.upper()) | |
base_elems.append(normalized_url) | |
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) | |
for k, v in sorted(args.items()))) | |
base_string = "&".join(_oauth_escape(e) for e in base_elems) | |
key_elems = [escape_utf8(urllib.quote(consumer_token["secret"], safe='~'))] | |
key_elems.append(escape_utf8(urllib.quote(token["secret"], safe='~') if token else "")) | |
key = b("&").join(key_elems) | |
hash = hmac.new(key, escape_utf8(base_string), hashlib.sha1) | |
return binascii.b2a_base64(hash.digest())[:-1] | |
def _oauth_parse_response(body): | |
p = parse_qs(body, keep_blank_values=False) | |
token = dict(key=p[b("oauth_token")][0], secret=p[b("oauth_token_secret")][0]) | |
# Add the extra parameters the Provider included to the token | |
special = (b("oauth_token"), b("oauth_token_secret")) | |
token.update((k, p[k][0]) for k in p if k not in special) | |
return token | |
class FanfouOAuth(object): | |
_API_BASE_URL = "http://api.fanfou.com" | |
_REQUEST_TOKEN_URL = "http://fanfou.com/oauth/request_token" | |
_AUTHENTICATE_URL = "http://fanfou.com/oauth/authenticate" | |
_ACCESS_TOKEN_URL = "http://fanfou.com/oauth/access_token" | |
def __init__(self, key, secret, callback = None): | |
self.key = key | |
self.secret = secret | |
self.callback = callback | |
def _get_consumer_token(self): | |
return dict(key = self.key, | |
secret = self.secret) | |
def _get_access_token(self): | |
if self._ACCESS_TOKEN_KEY and self._ACCESS_TOKEN_SECRET: | |
return dict( key = self._ACCESS_TOKEN_KEY, | |
secret = self._ACCESS_TOKEN_SECRET) | |
return None | |
def _oauth_request_parameters(self, url, access_token, parameters={}, | |
method="GET"): | |
consumer_token = self._get_consumer_token() | |
base_args = dict( | |
oauth_consumer_key = consumer_token["key"], | |
oauth_token = access_token["key"], | |
oauth_signature_method = "HMAC-SHA1", | |
oauth_timestamp = str(int(time.time())), | |
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes), | |
oauth_version = "1.0a", | |
) | |
args = {} | |
args.update(base_args) | |
args.update(parameters) | |
signature = _signature_request(consumer_token, method, url, args, | |
access_token) | |
base_args["oauth_signature"] = signature | |
return base_args | |
def set_access_token(self, key, secret): | |
self._ACCESS_TOKEN_KEY = key | |
self._ACCESS_TOKEN_SECRET = secret | |
def get_access_token(self, response, request_key): | |
cookies = Cookies(response, max_age = 3600, path = "/") | |
request_cookie = cookies['_oauth_request_token'] | |
cookie_key, cookie_secret = [base64.b64decode(escape_utf8(i)) for i in request_cookie.split("|")] | |
del cookies['_oauth_request_token'] | |
if cookie_key != request_key: | |
logging.info((cookie_key, request_key, request_cookie)) | |
logging.warning("Request token does not match cookie") | |
return | |
token = dict(key=cookie_key, secret=cookie_secret) | |
consumer_token = self._get_consumer_token() | |
url = self._ACCESS_TOKEN_URL | |
args = dict( | |
oauth_consumer_key = consumer_token["key"], | |
oauth_token = token['key'], | |
oauth_signature_method = "HMAC-SHA1", | |
oauth_timestamp = str(int(time.time())), | |
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes), | |
oauth_version = "1.0a" | |
) | |
signature = _signature_request(consumer_token, "GET", url, args, token) | |
args["oauth_signature"] = signature | |
url = url + "?" + urllib.urlencode(args) | |
result = urlfetch.fetch(url, method = urlfetch.GET) | |
return result.content | |
def authenticate_redirect(self, response): | |
request_token_url = self._get_request_token() | |
result = urlfetch.fetch(request_token_url) | |
if result.status_code != 200: | |
raise Exception("Could not get request token") | |
cookies = Cookies(response, max_age = 3600, path = "/") | |
token = _oauth_parse_response(result.content) | |
data = (base64.b64encode(token["key"]) + b("|") + \ | |
base64.b64encode(token["secret"])) | |
cookies["_oauth_request_token"] = data | |
args = dict(oauth_token = token["key"]) | |
response.redirect(self._AUTHENTICATE_URL + "?" + urllib.urlencode(args)) | |
def fanfou_request(self, path, access_token = None, \ | |
post_args = None, **args): | |
url = self._API_BASE_URL + path + ".xml" | |
if not access_token: | |
access_token = self._get_access_token() | |
if access_token: | |
all_args = {} | |
all_args.update(args) | |
all_args.update(post_args or {}) | |
method = "POST" if post_args is not None else "GET" | |
oauth = self._oauth_request_parameters(url, access_token, all_args, method = method) | |
args.update(oauth) | |
if args: url += "?" + urllib.urlencode(args) | |
if method == "GET": | |
result = urlfetch.fetch(url, method = urlfetch.GET) | |
else: | |
result = urlfetch.fetch(url, method = urlfetch.POST, \ | |
payload = urllib.urlencode(post_args)) | |
return result.content | |
def _get_request_token(self): | |
token = self._get_consumer_token() | |
url = self._REQUEST_TOKEN_URL | |
args = dict( | |
oauth_consumer_key = token['key'], | |
oauth_signature_method = 'HMAC-SHA1', | |
oauth_timestamp = str(int(time.time())), | |
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes), | |
oauth_version = "1.0a" | |
) | |
signature = _signature_request(token, "GET", url, args) | |
args["oauth_signature"] = signature | |
return url + "?" + urllib.urlencode(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment