Created
January 10, 2012 02:56
-
-
Save joerussbowman/1586593 to your computer and use it in GitHub Desktop.
direction I am thinking for auth.py in tornado
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import binascii | |
import hashlib | |
import hmac | |
import logging | |
import time | |
import urllib | |
import urlparse | |
import uuid | |
from tornado import httpclient | |
from tornado import escape | |
from tornado import gen | |
from tornado.util import bytes_type, b | |
class OAuth(object): | |
"""Abstract implementation of OAuth. | |
Used for build on to make requests, without requiring it to be | |
a mixin for a request handler. For example this makes it possible | |
to call twitter_request from somewhere other than a request. | |
See Twitter below for example implementations. | |
""" | |
def _oauth_request_parameters(self, url, access_token, parameters={}, | |
method="GET"): | |
"""Returns the OAuth parameters as a dict for the given request. | |
parameters should include all POST arguments and query string arguments | |
that will be sent with the request. | |
""" | |
consumer_token = self._oauth_consumer_token() | |
base_args = dict( | |
oauth_consumer_key=consumer_token["key"], | |
oauth_token=access_token["key"], | |
oauth_signature_method="HMAC-SHA1", | |
oauth_timestamp=str(int(time.time())), | |
oauth_nonce=binascii.b2a_hex(uuid.uuid4().bytes), | |
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0a"), | |
) | |
args = {} | |
args.update(base_args) | |
args.update(parameters) | |
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a": | |
signature = _oauth10a_signature(consumer_token, method, url, args, | |
access_token) | |
else: | |
signature = _oauth_signature(consumer_token, method, url, args, | |
access_token) | |
base_args["oauth_signature"] = signature | |
return base_args | |
class Twitter(OAuth): | |
""" Twitter implementation """ | |
def __init__(self, consumer_key=None, consumer_secret=None): | |
self.consumer_key = consumer_key | |
self.consumer_secret = consumer_secret | |
@gen.engine | |
def twitter_request(self, path, callback, access_token=None, | |
post_args=None, **args): | |
"""Fetches the given API path, e.g., "/statuses/user_timeline/btaylor" | |
The path should not include the format (we automatically append | |
".json" and parse the JSON output). | |
If the request is a POST, post_args should be provided. Query | |
string arguments should be given as keyword arguments. | |
All the Twitter methods are documented at | |
http://apiwiki.twitter.com/Twitter-API-Documentation. | |
Many methods require an OAuth access token which you can obtain | |
through authorize_redirect() and get_authenticated_user(). The | |
user returned through that process includes an 'access_token' | |
attribute that can be used to make authenticated requests via | |
this method. Example usage:: | |
class MainHandler(tornado.web.RequestHandler, | |
tornado.auth.TwitterMixin): | |
@tornado.web.authenticated | |
@tornado.web.asynchronous | |
def get(self): | |
self.twitter_request( | |
"/statuses/update", | |
post_args={"status": "Testing Tornado Web Server"}, | |
access_token=user["access_token"], | |
callback=self.async_callback(self._on_post)) | |
def _on_post(self, new_entry): | |
if not new_entry: | |
# Call failed; perhaps missing permission? | |
self.authorize_redirect() | |
return | |
self.finish("Posted a message!") | |
""" | |
# Add the OAuth resource request signature if we have credentials | |
url = "http://api.twitter.com/1" + path + ".json" | |
if access_token: | |
all_args = {} | |
all_args.update(args) | |
all_args.update(post_args or {}) | |
method = "POST" if post_args is not None else "GET" | |
oauth = self._oauth_request_parameters( | |
url, access_token, all_args, method=method) | |
args.update(oauth) | |
if args: url += "?" + urllib.urlencode(args) | |
http = httpclient.AsyncHTTPClient() | |
if post_args is not None: | |
response = yield gen.Task(http.fetch, url, method="POST", | |
body=urllib.urlencode(post_args)) | |
else: | |
response = yield gen.Task(http.fetch, url) | |
if response.error: | |
logging.warning("Error response %s fetching %s", response.error, | |
response.request.url) | |
callback(None) | |
return | |
callback(escape.json_decode(response.body)) | |
def _oauth_consumer_token(self): | |
if hasattr(self, "require_setting"): | |
self.require_setting("twitter_consumer_key", "Twitter OAuth") | |
self.require_setting("twitter_consumer_secret", "Twitter OAuth") | |
return dict( | |
key=self.settings["twitter_consumer_key"], | |
secret=self.settings["twitter_consumer_secret"]) | |
elif self.consumer_key and self.consumer_secret: | |
return dict( | |
key=self.consumer_key, | |
secret=self.consumer_secret) | |
else: | |
raise Exception("No consumer key and secret passed to make request") | |
def _oauth_signature(consumer_token, method, url, parameters={}, token=None): | |
"""Calculates the HMAC-SHA1 OAuth signature for the given request. | |
See http://oauth.net/core/1.0/#signing_process | |
""" | |
parts = urlparse.urlparse(url) | |
scheme, netloc, path = parts[:3] | |
normalized_url = scheme.lower() + "://" + netloc.lower() + path | |
base_elems = [] | |
base_elems.append(method.upper()) | |
base_elems.append(normalized_url) | |
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) | |
for k, v in sorted(parameters.items()))) | |
base_string = "&".join(_oauth_escape(e) for e in base_elems) | |
key_elems = [escape.utf8(consumer_token["secret"])] | |
key_elems.append(escape.utf8(token["secret"] if token else "")) | |
key = b("&").join(key_elems) | |
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1) | |
return binascii.b2a_base64(hash.digest())[:-1] | |
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None): | |
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request. | |
See http://oauth.net/core/1.0a/#signing_process | |
""" | |
parts = urlparse.urlparse(url) | |
scheme, netloc, path = parts[:3] | |
normalized_url = scheme.lower() + "://" + netloc.lower() + path | |
base_elems = [] | |
base_elems.append(method.upper()) | |
base_elems.append(normalized_url) | |
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) | |
for k, v in sorted(parameters.items()))) | |
base_string = "&".join(_oauth_escape(e) for e in base_elems) | |
key_elems = [escape.utf8(urllib.quote(consumer_token["secret"], safe='~'))] | |
key_elems.append(escape.utf8(urllib.quote(token["secret"], safe='~') if token else "")) | |
key = b("&").join(key_elems) | |
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1) | |
return binascii.b2a_base64(hash.digest())[:-1] | |
def _oauth_escape(val): | |
if isinstance(val, unicode): | |
val = val.encode("utf-8") | |
return urllib.quote(val, safe="~") | |
def _oauth_parse_response(body): | |
p = escape.parse_qs(body, keep_blank_values=False) | |
token = dict(key=p[b("oauth_token")][0], secret=p[b("oauth_token_secret")][0]) | |
# Add the extra parameters the Provider included to the token | |
special = (b("oauth_token"), b("oauth_token_secret")) | |
token.update((k, p[k][0]) for k in p if k not in special) | |
return token |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment