Skip to content

Instantly share code, notes, and snippets.

@fanzeyi
Created November 20, 2011 15:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fanzeyi/1380373 to your computer and use it in GitHub Desktop.
Save fanzeyi/1380373 to your computer and use it in GitHub Desktop.
Fanfou OAuth SDK for Google App Engine
class OAuthHandler(webapp.RequestHandler):
def get(self):
query_string = cgi.parse_qs(self.request.query_string, keep_blank_values = False)
oauth = FanfouOAuth(CONSUMER_KEY, CONSUMER_SECRET)
if 'oauth_token' in query_string:
self.response.out.write(oauth.get_access_token(self, query_string['oauth_token'][0]))
return
oauth.authenticate_redirect(self)
# -*- coding: utf-8 -*-
import uuid
import hmac
import time
import base64
import urllib
import hashlib
import binascii
import urlparse
import logging
from cookies import Cookies
from google.appengine.api import urlfetch
try:
from urlparse import parse_qs # Python 2.6+
except ImportError:
from cgi import parse_qs
if str is unicode:
def b(s):
return s.encode('latin1')
bytes_type = bytes
else:
def b(s):
return s
bytes_type = str
def _oauth_escape(val):
if isinstance(val, unicode):
val = val.encode("utf-8")
return urllib.quote(val, safe="~")
_UTF8_TYPES = (bytes, type(None))
def escape_utf8(value):
if isinstance(value, _UTF8_TYPES):
return value
# assert isinstance(value, unicode)
return value.encode("utf-8")
def _signature_request(consumer_token, method, url, args = {}, token = None):
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(args.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape_utf8(urllib.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape_utf8(urllib.quote(token["secret"], safe='~') if token else ""))
key = b("&").join(key_elems)
hash = hmac.new(key, escape_utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _oauth_parse_response(body):
p = parse_qs(body, keep_blank_values=False)
token = dict(key=p[b("oauth_token")][0], secret=p[b("oauth_token_secret")][0])
# Add the extra parameters the Provider included to the token
special = (b("oauth_token"), b("oauth_token_secret"))
token.update((k, p[k][0]) for k in p if k not in special)
return token
class FanfouOAuth(object):
_API_BASE_URL = "http://api.fanfou.com"
_REQUEST_TOKEN_URL = "http://fanfou.com/oauth/request_token"
_AUTHENTICATE_URL = "http://fanfou.com/oauth/authenticate"
_ACCESS_TOKEN_URL = "http://fanfou.com/oauth/access_token"
def __init__(self, key, secret, callback = None):
self.key = key
self.secret = secret
self.callback = callback
def _get_consumer_token(self):
return dict(key = self.key,
secret = self.secret)
def _get_access_token(self):
if self._ACCESS_TOKEN_KEY and self._ACCESS_TOKEN_SECRET:
return dict( key = self._ACCESS_TOKEN_KEY,
secret = self._ACCESS_TOKEN_SECRET)
return None
def _oauth_request_parameters(self, url, access_token, parameters={},
method="GET"):
consumer_token = self._get_consumer_token()
base_args = dict(
oauth_consumer_key = consumer_token["key"],
oauth_token = access_token["key"],
oauth_signature_method = "HMAC-SHA1",
oauth_timestamp = str(int(time.time())),
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version = "1.0a",
)
args = {}
args.update(base_args)
args.update(parameters)
signature = _signature_request(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = signature
return base_args
def set_access_token(self, key, secret):
self._ACCESS_TOKEN_KEY = key
self._ACCESS_TOKEN_SECRET = secret
def get_access_token(self, response, request_key):
cookies = Cookies(response, max_age = 3600, path = "/")
request_cookie = cookies['_oauth_request_token']
cookie_key, cookie_secret = [base64.b64decode(escape_utf8(i)) for i in request_cookie.split("|")]
del cookies['_oauth_request_token']
if cookie_key != request_key:
logging.info((cookie_key, request_key, request_cookie))
logging.warning("Request token does not match cookie")
return
token = dict(key=cookie_key, secret=cookie_secret)
consumer_token = self._get_consumer_token()
url = self._ACCESS_TOKEN_URL
args = dict(
oauth_consumer_key = consumer_token["key"],
oauth_token = token['key'],
oauth_signature_method = "HMAC-SHA1",
oauth_timestamp = str(int(time.time())),
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version = "1.0a"
)
signature = _signature_request(consumer_token, "GET", url, args, token)
args["oauth_signature"] = signature
url = url + "?" + urllib.urlencode(args)
result = urlfetch.fetch(url, method = urlfetch.GET)
return result.content
def authenticate_redirect(self, response):
request_token_url = self._get_request_token()
result = urlfetch.fetch(request_token_url)
if result.status_code != 200:
raise Exception("Could not get request token")
cookies = Cookies(response, max_age = 3600, path = "/")
token = _oauth_parse_response(result.content)
data = (base64.b64encode(token["key"]) + b("|") + \
base64.b64encode(token["secret"]))
cookies["_oauth_request_token"] = data
args = dict(oauth_token = token["key"])
response.redirect(self._AUTHENTICATE_URL + "?" + urllib.urlencode(args))
def fanfou_request(self, path, access_token = None, \
post_args = None, **args):
url = self._API_BASE_URL + path + ".xml"
if not access_token:
access_token = self._get_access_token()
if access_token:
all_args = {}
all_args.update(args)
all_args.update(post_args or {})
method = "POST" if post_args is not None else "GET"
oauth = self._oauth_request_parameters(url, access_token, all_args, method = method)
args.update(oauth)
if args: url += "?" + urllib.urlencode(args)
if method == "GET":
result = urlfetch.fetch(url, method = urlfetch.GET)
else:
result = urlfetch.fetch(url, method = urlfetch.POST, \
payload = urllib.urlencode(post_args))
return result.content
def _get_request_token(self):
token = self._get_consumer_token()
url = self._REQUEST_TOKEN_URL
args = dict(
oauth_consumer_key = token['key'],
oauth_signature_method = 'HMAC-SHA1',
oauth_timestamp = str(int(time.time())),
oauth_nonce = binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version = "1.0a"
)
signature = _signature_request(token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment