Skip to content

Instantly share code, notes, and snippets.

@gorakhargosh
Created June 20, 2011 18:15
Show Gist options
  • Save gorakhargosh/1036192 to your computer and use it in GitHub Desktop.
Save gorakhargosh/1036192 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2009 Facebook
# Copyright (C) 2010, 2011 Tipfy.org
# Copyright (C) 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Keep this as close to the Facebook Tornado code as possible.
# Any changes upstream will be easier to merge in here as a result.
"""
Implementations of various third-party authentication schemes.
All the classes in this file are class mixins designed to be used
with the webapp-improved RequestHandler class. The primary methods for each
service are authenticate_redirect(), authorize_redirect(), and
get_authenticated_user(). The former should be called to redirect the user to,
e.g., the OpenID authentication page on the third party service, and the latter
should be called upon return to get the user data from the data returned by the
third party service.
They all take slightly different arguments due to the fact that all these
services implementat authentication and authorization slightly differently.
See the individual service classes below for complete documentation.
Example usage for Google OpenID::
class GoogleHandler(RequestHandler, GoogleMixin):
def get(self):
if self.request.get('openid.mode', None):
self.get_authenticated_user(self._on_auth)
return
self.authenticate_redirect()
def _on_auth(self, user):
if not user:
raise ValueError("Google Authentication failed.")
# Save the user with ... self.session_store.set_secure_cookie()
"""
#import base64
import binascii
import cgi
import functools
import hashlib
import hmac
import logging
import time
import urllib
import urlparse
import uuid
from google.appengine.api import urlfetch
class OpenIdMixin(object):
"""
Abstract implementation of OpenID and Attribute Exchange.
See GoogleMixin for example implementation.
"""
# Implement this in subclasses.
#_OPENID_ENDPOINT = None
def authenticate_redirect(self, callback_uri=None, ax_attrs=None):
"""
Redirects to the authentication URL for this service.
After authentication, the service will redirect back to the given
callback URI.
We request the given attributes for the authenticated user by default
(name, email, language, and username). If you don't need all those
attributes for your app, you can request fewer with the ax_attrs keyword
argument.
:param callback_uri:
The URL to redirect to after authentication.
:param ax_attrs:
List of Attribute Exchange attributes to be fetched.
:returns:
None
"""
ax_attrs = ax_attrs or ["name", "email", "language", "username"]
request_uri = self.request.path
callback_uri = callback_uri or request_uri
args = self._openid_args(callback_uri, ax_attrs=ax_attrs)
self.redirect(self._OPENID_ENDPOINT + "?" + urllib.urlencode(args))
def get_authenticated_user(self, callback):
"""
Fetches the authenticated user data upon redirect.
This method should be called by the handler that handles the callback
URL to which the service redirects when the authenticate_redirect()
or authorize_redirect() methods are called.
:param callback:
A function that is called after the authentication attempt. It is
called passing a dictionary with the requested user attributes or
None if the authentication failed.
"""
request_arguments = self.request.params
# Verify the OpenID response via direct request to the OP
args = dict((k, v[-1]) for k, v in request_arguments.iteritems())
args["openid.mode"] = u"check_authentication"
url = self._OPENID_ENDPOINT
try:
response = urlfetch.fetch(url, deadline=10, method=urlfetch.POST,
payload=urllib.urlencode(args))
if response.status_code < 200 or response.status_code >= 300:
logging.warning("Invalid OpenID response (%d): %s",
response.status_code, response.content)
else:
self._on_authentication_verified(callback, response)
return
except urlfetch.DownloadError, e:
logging.exception(e)
self._on_authentication_verified(callback, None)
def _openid_args(self, callback_uri, ax_attrs=None, oauth_scope=None):
"""
Builds and returns the OpenID arguments used in the authentication request.
:param callback_uri:
The URL to redirect to after authentication.
:param ax_attrs:
List of Attribute Exchange attributes to be fetched.
:param oauth_scope:
OAuth scope.
:returns:
A dictionary of arguments for the authentication URL.
"""
ax_attrs = ax_attrs or []
url = urlparse.urljoin(self.request.url, callback_uri)
request_host = self.request.host
request_protocol = self.request.scheme
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id":
"http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity":
"http://specs.openid.net/auth/2.0/identifier_select",
"openid.return_to": url,
"openid.realm": request_protocol + "://" + request_host + "/",
"openid.mode": "checkid_setup",
}
if ax_attrs:
args.update({
"openid.ns.ax": "http://openid.net/srv/ax/1.0",
"openid.ax.mode": "fetch_request",
})
ax_attrs = set(ax_attrs)
required = []
if "name" in ax_attrs:
ax_attrs -= set(["name", "firstname", "fullname", "lastname"])
required += ["firstname", "fullname", "lastname"]
args.update({
"openid.ax.type.firstname":
"http://axschema.org/namePerson/first",
"openid.ax.type.fullname":
"http://axschema.org/namePerson",
"openid.ax.type.lastname":
"http://axschema.org/namePerson/last",
})
known_attrs = {
"email": "http://axschema.org/contact/email",
"language": "http://axschema.org/pref/language",
"username": "http://axschema.org/namePerson/friendly",
}
for name in ax_attrs:
args["openid.ax.type." + name] = known_attrs[name]
required.append(name)
args["openid.ax.required"] = ",".join(required)
if oauth_scope:
args.update({
"openid.ns.oauth":
"http://specs.openid.net/extensions/oauth/1.0",
"openid.oauth.consumer": request_host.split(":")[0],
"openid.oauth.scope": oauth_scope,
})
return args
def _on_authentication_verified(self, callback, response):
"""
Called after the authentication attempt. It calls the callback function
set when the authentication process started, passing a dictionary of
user data if the authentication was successful or None if it failed.
:param callback:
A function that is called after the authentication attempt
"""
if not response:
logging.warning("Missing OpenID response.")
callback(None)
return
elif response.status_code < 200 or response.status_code >= 300 or \
u"is_value:true" not in response.content:
logging.warning("Invalid OpenID response (%d): %s",
response.status_code, response.content)
callback(None)
return
request_arguments = self.request.params
claimed_id = self.request.get("openid.claimed_id", u"")
# Make sure we got back at least an email from Attribute Exchange.
ax_ns = None
for name, values in request_arguments.iteritems():
if name.startswith("openid.ns.") and\
values[-1] == u"http://openid.net/srv/ax/1.0":
ax_ns = name[10:]
break
email = self._get_ax_arg("http://axschema.org/contact/email", ax_ns)
name = self._get_ax_arg("http://axschema.org/namePerson", ax_ns)
first_name = self._get_ax_arg("http://axschema.org/namePerson/first",
ax_ns)
last_name = self._get_ax_arg("http://axschema.org/namePerson/last",
ax_ns)
username = self._get_ax_arg("http://axschema.org/namePerson/friendly",
ax_ns)
locale = self._get_ax_arg("http://axschema.org/pref/language",
ax_ns).lower()
user = dict()
name_parts = []
if first_name:
user["first_name"] = first_name
name_parts.append(first_name)
if last_name:
user["last_name"] = last_name
name_parts.append(last_name)
if name:
user["name"] = name
elif name_parts:
user["name"] = u" ".join(name_parts)
elif email:
user["name"] = email.split("@")[0]
if email: user["email"] = email
if locale: user["locale"] = locale
if username: user["username"] = username
# Get the claimed ID. Not in facebook code. Borrowed from Tipfy.
user["claimed_id"] = claimed_id
callback(user)
def _get_ax_arg(self, uri, ax_ns):
"""
Returns an Attribute Exchange value from the request.
:param uri:
Attribute Exchange URI.
:param ax_ns:
Attribute Exchange namespace.
:returns:
The Attribute Exchange value, if found in the request.
"""
request_arguments = self.request.params
if not ax_ns:
return u""
prefix = "openid." + ax_ns + ".type."
ax_name = None
for name, values in request_arguments.iteritems():
if values[-1] == uri and name.startswith(prefix):
part = name[len(prefix):]
ax_name = "openid." + ax_ns + ".value." + part
break
if not ax_name:
return u""
return self.request.get(ax_name, u"")
class OAuthMixin(object):
"""
Abstract implementation of OAuth.
See TwitterMixin and FriendFeedMixin for example implementations.
"""
# Define these in the subclasses.
#_OAUTH_VERSION = "1.0a"
#_OAUTH_AUTHORIZE_URL = None
#_OAUTH_ACCESS_TOKEN_URL = None
#_OAUTH_REQUEST_TOKEN_URL = None
#_OAUTH_NO_CALLBACKS = False
def authorize_redirect(self, callback_uri=None, extra_params=None):
"""
Redirects the user to obtain OAuth authorization for this service.
Twitter and FriendFeed both require that you register a callback
URL with your application. You should call this method to log the user
in, and then call get_authenticated_user() in the handler you registered
as your callback URL to complete the authorization process.
This method sets a cookie called _oauth_request_token which is
subsequently used (and cleared) in get_authenticated_user for security
purposes.
:param callback_uri:
Callback URI.
:param extra_params:
Extra parameters
"""
if callback_uri and getattr(self, "_OAUTH_NO_CALLBACKS", False):
raise Exception("This service does not support oauth_callback")
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
url = self._oauth_request_token_url(callback_uri=callback_uri,
extra_params=extra_params)
else:
url = self._oauth_request_token_url()
try:
response = urlfetch.fetch(url, deadline=10)
except urlfetch.DownloadError, e:
logging.exception(e)
response = None
self._on_request_token(self._OAUTH_AUTHORIZE_URL, callback_uri,
response)
def get_authenticated_user(self, callback):
"""
Gets the OAuth authorized user and access token on callback.
This method should be called from the handler for your registered
OAuth callback URL to complete the registration process. We call
callback with the authenticated user, which in addition to standard
attributes like 'name' includes the 'access_key' attribute, which
contains the OAuth access you can use to make authorized requests
to this service on behalf of the user.
:param callback:
The callback that will be called upon successful authorization.
"""
request_key = self.request.get("oauth_token")
oauth_verifier = self.request.get("oauth_verifier", None)
#request_cookie = self.request.cookies.get("_oauth_request_token")
request_cookie = self.session_store.get_secure_cookie("_oauth_request_token")
if not request_cookie:
logging.warning("Missing OAuth request token cookie")
callback(None)
return
#self.session_store.delete_cookie("_oauth_request_token")
self.response.delete_cookie("_oauth_request_token")
#cookie_key, cookie_secret = [base64.b64decode(i) for i in
# request_cookie.split("|")]
cookie_key = request_cookie["key"]
cookie_secret = request_cookie["secret"]
if cookie_key != request_key:
logging.warning("Request token does not match cookie")
callback(None)
return
token = dict(key=cookie_key, secret=cookie_secret)
if oauth_verifier:
token["verifier"] = oauth_verifier
try:
url = self._oauth_access_token_url(token)
response = urlfetch.fetch(url, deadline=10)
except urlfetch.DownloadError, e:
logging.exception(e)
response = None
self._on_access_token(callback, response)
def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
"""
Obtains the OAuth request token URL along with the query parameters
filled in.
:param callback_uri:
Where the service will redirect after returning a request token.
:param extra_params:
Extra query parameters you want to send along with the request.
:returns:
A properly encoded request token URL.
"""
request_url = self.request.url
oauth_version = getattr(self, "_OAUTH_VERSION", "1.0a")
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = {
"oauth_consumer_key": consumer_token["key"],
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": _oauth_generate_timestamp(),
"oauth_nonce": _oauth_generate_nonce(),
"oauth_version": oauth_version,
}
if oauth_version == "1.0a":
if callback_uri:
args["oauth_callback"] = urlparse.urljoin(request_url,
callback_uri)
if extra_params:
args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
def _on_request_token(self, authorize_url, callback_uri, response):
"""
When a request token response is received.
:param authorize_url:
The authorization URL to redirect to obtain an access token.
:param callback_uri:
The callback URI.
:param response:
OAuth response.
:returns:
None
"""
request_url = self.request.url
if not response:
logging.warning("Could not get OAuth request token.")
self.abort(500)
elif response.status_code < 200 or response.status_code >= 300:
logging.warning(
"Bad OAuth response when requesting a token (%d): %s",
response.status_code, response.content)
self.abort(500)
request_token = _oauth_parse_response(response.content)
#data = "|".join([
# base64.b64encode(request_token["key"]),
# base64.b64encode(request_token["secret"]),
# ])
data = {
"key": request_token["key"],
"secret": request_token["secret"],
}
self.session_store.set_secure_cookie("_oauth_request_token", data)
args = dict(oauth_token=request_token["key"])
if callback_uri:
args["oauth_callback"] = urlparse.urljoin(request_url, callback_uri)
self.redirect(authorize_url + "?" + urllib.urlencode(args))
def _oauth_access_token_url(self, request_token):
"""
Obtains the OAuth Access token URL with query parameters filled in.
:param request_token:
The OAuth request token obtained in a previous step.
:returns:
Properly encoded access token URL.
"""
oauth_version = getattr(self, "_OAUTH_VERSION", "1.0a")
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_ACCESS_TOKEN_URL
args = {
"oauth_consumer_key": consumer_token["key"],
"oauth_token": request_token["key"],
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": _oauth_generate_timestamp(),
"oauth_nonce": _oauth_generate_nonce(),
"oauth_version": oauth_version,
}
if "verifier" in request_token:
args["oauth_verifier"] = request_token["verifier"]
if oauth_version == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
request_token)
else:
signature = _oauth_signature(consumer_token, "GET", url, args,
request_token)
args["oauth_signature"] = signature
return url + "?" + urllib.urlencode(args)
def _on_access_token(self, callback, response):
if not response:
logging.warning("Could not fetch access token")
#self.abort(500)
callback(None)
return
elif response.status_code < 200 or response.status_code >= 300:
logging.warning(
"Bad OAuth response trying to get access token (%d): %s",
response.status_code, response.content)
#self.abort(500)
callback(None)
return
access_token = _oauth_parse_response(response.content)
user = self._oauth_get_user(access_token,
functools.partial(self._on_oauth_get_user,
access_token, callback))
def _oauth_consumer_token(self):
"""
Returns the consumer token for this service handler.
Needs to be overridden.
"""
raise NotImplementedError()
def _oauth_get_user(self, access_token, callback):
"""
Obtains the user data from OAuth access token.
Needs to be overridden.
:param access_token:
OAuth access token
:param callback:
Callback that will be given the data.
"""
raise NotImplementedError()
def _on_oauth_get_user(self, access_token, callback, user):
"""
:param access_token:
Access token.
:param callback:
:param user:
"""
if not user:
callback(None)
return
user["access_token"] = access_token
callback(user)
def _oauth_request_parameters(self, url, access_token, parameters=None,
method="GET"):
"""
Returns the OAuth parameters as a dict for the given request.
Parameters should include all POST arguments and query string arguments
that will be sent with the request.
:param url:
Request URL.
:param access_token:
OAuth Access Token
:param parameters:
A dictionary of query parameters.
:param method:
HTTP Method.
"""
parameters = parameters or {}
oauth_version = getattr(self, "_OAUTH_VERSION", "1.0a")
consumer_token = self._oauth_consumer_token()
base_args = {
"oauth_consumer_key": consumer_token["key"],
"oauth_token": access_token["key"],
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": _oauth_generate_timestamp(),
"oauth_nonce": _oauth_generate_nonce(),
"oauth_version": oauth_version,
}
args = {}
args.update(base_args)
args.update(parameters)
if oauth_version == "1.0a":
signature = _oauth10a_signature(consumer_token, method, url, args,
access_token)
else:
signature = _oauth_signature(consumer_token, method, url, args,
access_token)
base_args["oauth_signature"] = signature
return base_args
class OAuth2Mixin(object):
"""
Abstract implementation of OAuth v 2.
"""
def authorize_redirect(self, redirect_uri=None, client_id=None,
client_secret=None, extra_params=None):
"""
Redirects the user to obtain OAuth authorization for this service.
Some providers require that you register a callback URL with your
application. You should call this method to log the user in, and then
call get_authenticated_user() in the handler you registered as your
callback URL to complete the authorization process.
:param redirect_uri:
:param client_id:
:param client_secret:
:param extra_params:
"""
args = {
"redirect_uri": redirect_uri,
"client_id": client_id,
}
if extra_params:
args.update(extra_params)
self.redirect(_url_concat(self._OAUTH_AUTHORIZE_URL, args))
def _oauth_request_token_url(self, redirect_uri=None, client_id=None,
client_secret=None, code=None,
extra_params=None):
"""
Obtains the OAuth request token URL.
:param redirect_uri:
:param client_id:
:param client_secret:
:param code:
:param extra_params:
:returns:
Properly encoded request token URL.
"""
url = self._OAUTH_REQUEST_TOKEN_URL
args = {
"redirect_uri": redirect_uri,
"code": code,
"client_id": client_id,
"client_secret": client_secret,
}
if extra_params:
args.update(extra_params)
return _url_concat(url, args)
def _oauth_signature(consumer_token, method, url, parameters=None, token=None):
"""
Calculates the HMAC-SHA1 OAuth signature for the given request.
See http://oauth.net/core/1.0/#signing_process
"""
parameters = parameters or {}
base_string = _oauth_get_signature_base_string(url, method, parameters)
key_elems = [consumer_token["secret"]]
key_elems.append(token["secret"] if token else "")
key = "&".join(key_elems)
return _oauth_hmac_sha1_base64_digest(key, base_string)
def _oauth10a_signature(consumer_token, method, url, parameters=None,
token=None):
"""
Calculates the HMAC-SHA1 OAuth signature for the given request.
See http://oauth.net/core/1.0a/#signing_process
"""
parameters = parameters or {}
base_string = _oauth_get_signature_base_string(url, method, parameters)
key_elems = [_oauth_escape(consumer_token["secret"])]
key_elems.append(_oauth_escape(token["secret"]) if token else "")
key = "&".join(key_elems)
return _oauth_hmac_sha1_base64_digest(key, base_string)
def _oauth_hmac_sha1_base64_digest(key, base_string):
"""
Calculates a base-64 encoded HMAC-SHA1 digest for the given
OAuth key and base string.
:param key:
Key query string.
:param base_string:
Base query string.
:returns:
Base64-encoded HMAC-SHA1 digest.
"""
hashed = hmac.new(key, base_string, hashlib.sha1)
return binascii.b2a_base64(hashed.digest())[:-1]
def _oauth_get_signature_base_string(url, method, parameters):
"""
Calculates an OAuth base string from the given URL, method, and parameters.
:param url:
The URL.
:param method:
HTTP Method.
:param parameters:
Query string parameters.
:returns:
OAuth request base string.
"""
normalized_url = _oauth_get_normalized_url(url)
query_string = "&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items()))
base_elems = [method.upper(), normalized_url, query_string]
base_string = "&".join(_oauth_escape(e) for e in base_elems)
return base_string
def _oauth_get_normalized_url(url):
"""
Normalizes a URL that will be used in the oauth signature.
Doctests::
>>>
:param url:
The URL to normalize.
:returns:
Normalized URL.
"""
parts = urlparse.urlparse(url)
scheme, netloc, path = parts[:3]
normalized_url = scheme.lower() + "://" + netloc.lower() + path
return normalized_url
def _oauth_escape(val):
"""
Escapes the value of a query string parameter according to the OAuth spec.
:param val:
Query string parameter value to escape.
:returns:
String representing escaped value.
"""
if isinstance(val, unicode):
val = val.encode("utf-8")
return _oauth_quote(val)
def _oauth_quote(val, safe="~"):
"""
Quotes a URL value as per the OAuth spec.
:param val:
The value to quote.
:param safe:
The safe characters.
:returns:
Quoted value.
"""
return urllib.quote(val, safe=safe)
def _oauth_generate_nonce():
"""
Calculates an OAuth nonce.
:returns:
OAuth nonce string.
"""
return binascii.b2a_hex(uuid.uuid4().bytes)
def _oauth_generate_timestamp():
"""
Returns a string timestamp value according to the OAuth spec.
:returns:
OAuth timestamp integer as string.
"""
return str(int(time.time()))
def _oauth_parse_response(body):
"""
Parses token out of the OAuth response body.
:param body:
The OAuth response body.
:returns:
A token as a dictionary.
"""
p = cgi.parse_qs(body, keep_blank_values=False)
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0])
special = ("oauth_token", "oauth_token_secret")
token.update((k, p[k][0]) for k in p if k not in special)
return token
def _url_concat(url, args):
"""Concatenate url and argument dictionary regardless of whether
url has existing query parameters.
>>> url_concat("http://example.com/foo?a=b", dict(c="d"))
'http://example.com/foo?a=b&c=d'
"""
if not args:
return url
if url[-1] not in ('?', '&'):
url += '&' if ('?' in url) else '?'
return url + urllib.urlencode(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment