Skip to content

Instantly share code, notes, and snippets.

@joerussbowman
Created January 10, 2012 02:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joerussbowman/1586593 to your computer and use it in GitHub Desktop.
Save joerussbowman/1586593 to your computer and use it in GitHub Desktop.
direction I am thinking for auth.py in tornado
import binascii
import hashlib
import hmac
import logging
import time
import urllib
import urlparse
import uuid
from tornado import httpclient
from tornado import escape
from tornado import gen
from tornado.util import bytes_type, b
class OAuth(object):
"""Abstract implementation of OAuth.
Used for build on to make requests, without requiring it to be
a mixin for a request handler. For example this makes it possible
to call twitter_request from somewhere other than a request.
See Twitter below for example implementations.
"""
def _oauth_request_parameters(self, url, access_token, parameters={},
method="GET"):
"""Returns the OAuth parameters as a dict for the given request.
parameters should include all POST arguments and query string arguments
that will be sent with the request.
"""
consumer_token = self._oauth_consumer_token()
base_args = dict(
oauth_consumer_key=consumer_token["key"],
oauth_token=access_token["key"],
oauth_signature_method="HMAC-SHA1",
oauth_timestamp=str(int(time.time())),
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes),
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0a"),
)
args = {}
args.update(base_args)
args.update(parameters)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, method, url, args,
access_token)
else:
signature = _oauth_signature(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = signature
return base_args
class Twitter(OAuth):
""" Twitter implementation """
def __init__(self, consumer_key=None, consumer_secret=None):
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
@gen.engine
def twitter_request(self, path, callback, access_token=None,
post_args=None, **args):
"""Fetches the given API path, e.g., "/statuses/user_timeline/btaylor"
The path should not include the format (we automatically append
".json" and parse the JSON output).
If the request is a POST, post_args should be provided. Query
string arguments should be given as keyword arguments.
All the Twitter methods are documented at
http://apiwiki.twitter.com/Twitter-API-Documentation.
Many methods require an OAuth access token which you can obtain
through authorize_redirect() and get_authenticated_user(). The
user returned through that process includes an 'access_token'
attribute that can be used to make authenticated requests via
this method. Example usage::
class MainHandler(tornado.web.RequestHandler,
tornado.auth.TwitterMixin):
@tornado.web.authenticated
@tornado.web.asynchronous
def get(self):
self.twitter_request(
"/statuses/update",
post_args={"status": "Testing Tornado Web Server"},
access_token=user["access_token"],
callback=self.async_callback(self._on_post))
def _on_post(self, new_entry):
if not new_entry:
# Call failed; perhaps missing permission?
self.authorize_redirect()
return
self.finish("Posted a message!")
"""
# Add the OAuth resource request signature if we have credentials
url = "http://api.twitter.com/1" + path + ".json"
if access_token:
all_args = {}
all_args.update(args)
all_args.update(post_args or {})
method = "POST" if post_args is not None else "GET"
oauth = self._oauth_request_parameters(
url, access_token, all_args, method=method)
args.update(oauth)
if args: url += "?" + urllib.urlencode(args)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
response = yield gen.Task(http.fetch, url, method="POST",
body=urllib.urlencode(post_args))
else:
response = yield gen.Task(http.fetch, url)
if response.error:
logging.warning("Error response %s fetching %s", response.error,
response.request.url)
callback(None)
return
callback(escape.json_decode(response.body))
def _oauth_consumer_token(self):
if hasattr(self, "require_setting"):
self.require_setting("twitter_consumer_key", "Twitter OAuth")
self.require_setting("twitter_consumer_secret", "Twitter OAuth")
return dict(
key=self.settings["twitter_consumer_key"],
secret=self.settings["twitter_consumer_secret"])
elif self.consumer_key and self.consumer_secret:
return dict(
key=self.consumer_key,
secret=self.consumer_secret)
else:
raise Exception("No consumer key and secret passed to make request")
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
See http://oauth.net/core/1.0/#signing_process
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(consumer_token["secret"])]
key_elems.append(escape.utf8(token["secret"] if token else ""))
key = b("&").join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
See http://oauth.net/core/1.0a/#signing_process
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
base_elems = []
base_elems.append(method.upper())
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(urllib.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape.utf8(urllib.quote(token["secret"], safe='~') if token else ""))
key = b("&").join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
def _oauth_escape(val):
if isinstance(val, unicode):
val = val.encode("utf-8")
return urllib.quote(val, safe="~")
def _oauth_parse_response(body):
p = escape.parse_qs(body, keep_blank_values=False)
token = dict(key=p[b("oauth_token")][0], secret=p[b("oauth_token_secret")][0])
# Add the extra parameters the Provider included to the token
special = (b("oauth_token"), b("oauth_token_secret"))
token.update((k, p[k][0]) for k in p if k not in special)
return token
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment