Skip to content

Instantly share code, notes, and snippets.

@gorakhargosh
Created July 3, 2011 17:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save gorakhargosh/1062423 to your computer and use it in GitHub Desktop.
Save gorakhargosh/1062423 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2009 Facebook
# Copyright (C) 2010, 2011 Tipfy.org
# Copyright (C) 2011 Yesudeep Mangalapilly <yesudeep@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
try:
# Python 3.
from urllib.parse import urljoin
except ImportError:
# Python 2.5+
from urlparse import urljoin
from webapp2 import cached_property
from pyoauth.error import InvalidHttpRequestError
from pyoauth.oauth1 import Credentials
from pyoauth.oauth1.client import Client
class OAuthMixin(object):
"""
OAuth 1.0 handler mixin implementation.
"""
@property
def oauth_client_identifier(self):
raise NotImplementedError("This property must be overridden by the handler to return the shared secret string.")
@property
def oauth_client_shared_secret(self):
raise NotImplementedError("This property must be overridden by the handler to return the shared secret string.")
@cached_property
def oauth_client(self):
raise NotImplementedError("This cached_property must be overridden by the handler to return an OAuth client instance.")
@cached_property
def _oauth_client_credentials(self):
return Credentials(identifier=self.oauth_client_identifier,
shared_secret=self.oauth_client_shared_secret)
def authorize_redirect(self, callback_uri="oob", realm=None, **kwargs):
"""
Redirects the resource owner to obtain OAuth authorization for this
service.
You should call this method to log the user in, and then call
:func:`get_authenticated_user` in the handler you registered
as your callback URL to complete the authorization process.
This method sets a cookie called
``_oauth_temporary_credentials`` which is subsequently used (and
cleared) in :func:`get_authenticated_user` for security purposes.
:param callback_uri:
The callback URI.
:param realm:
The OAuth authorization realm.
:param kwargs:
Currently unused.
"""
callback_uri = callback_uri or "oob"
if callback_uri and callback_uri != "oob":
callback_uri = urljoin(self._oauth_request_full_url, callback_uri)
credentials_request = self.oauth_client.build_temporary_credentials_request(
realm=realm,
oauth_callback=callback_uri
)
self._oauth_fetch(credentials_request, self._on_oauth_temporary_credentials_response)
def get_authenticated_user(self, callback, realm=None):
"""
Gets the OAuth authorized user and access token on callback.
This method should be called from the handler for your registered
OAuth callback URL to complete the registration process. We call
callback with the authenticated user, which in addition to standard
attributes like 'name' includes the 'access_key' attribute, which
contains the OAuth access you can use to make authorized requests
to this service on behalf of the user.
:param callback:
The callback that will be called upon successful authorization
with the user object as its first argument.
"""
oauth_token = self._oauth_request_get("oauth_token")
oauth_verifier = self._oauth_request_get("oauth_verifier")
credentials = self._oauth_get_temporary_credentials_from_cookie()
if not credentials:
callback(None)
return
try:
self.oauth_client.check_verification_code(
temporary_credentials,
oauth_token,
oauth_verifier
)
except InvalidHttpRequestError, e:
logging.exception(e)
callback(None)
return
request = self.oauth_client.build_token_credentials_request(
temporary_credentials=temporary_credentials,
oauth_verifier=oauth_verifier,
realm=realm
)
self._oauth_fetch(request, self._on_oauth_token_credentials_response, callback=callback)
# Framework specific.
def _on_oauth_token_credentials_response(self, callback, response):
"""
Called when token credentials are received in an OAuth response.
:param response:
HTTP response received from the OAuth server.
:returns:
None
"""
if response:
params, credentials = \
self.oauth_client.parse_token_credentials_response(
status_code=response.status_code,
status=response.status,
body=response.content,
headers=response.headers
)
# Pending.
else:
logging.warning("OAuth token credentials could not be fetched.")
callback(None)
return
def _on_oauth_temporary_credentials_response(self, response):
"""
Called when temporary credentials are received in an OAuth response.
:param response:
HTTP response received from the OAuth server.
:returns:
None
"""
if response:
params, credentials = \
self.oauth_client.parse_temporary_credentials_response(
status_code=response.status_code,
status=response.status,
body=response.content,
headers=response.headers
)
self._oauth_set_temporary_credentials_cookie(credentials)
self._oauth_redirect(self.oauth_client.get_authorization_url(credentials))
else:
logging.warning("Could not get OAuth response when requesting temporary credentials.")
self._oauth_abort(500)
def _oauth_get_temporary_credentials_from_cookie(self, cookie_name="_oauth_temporary_credentials"):
# Get the temporary credentials stored in the secure cookie and clear
# the cookie.
credentials_cookie = self.session_store.get_secure_cookie(cookie_name)
if credentials_cookie:
self.response.delete_cookie(cookie_name)
return Credentials(**credentials_cookie)
else:
logging.warning("Missing OAuth temporary credentials cookie.")
return None
def _oauth_set_temporary_credentials_cookie(self, credentials, cookie_name="_oauth_temporary_credentials"):
self.session_store.set_secure_cookie(cookie_name, credentials.to_dict())
# Utilities.
@property
def _oauth_request_full_url(self):
return self.request.url
@property
def _oauth_request_path(self):
return self.request.path
def _oauth_request_get(self, argument):
return self.request.get(argument)
def _oauth_redirect(self, url):
self.redirect(url)
def _oauth_abort(self, status_code):
self.abort(status_code)
def _oauth_fetch(self, oauth_request, async_callback=None, *args, **kwargs):
"""
Fetches a response from the OAuth server for a given OAuth request.
Self contained HTTP request method can be replaced with
one for your Web framework.
:param oauth_request:
An instance of type :class:`pyoauth.http.RequestProxy`.
:param async_callback:
``None`` by default. Unused on App Engine. Useful when your request
fetching method is asynchronous. Set to a callback function which
has the following signature::
def handle_response(response):
pass
If callback is not set, the response is returned by the method.
:param args:
Any additional positional arguments to be passed to the
``async_callback``.
:param kwargs:
Any additional arguments to be passed to the ``async_callback``.
"""
if async_callback:
from functools import partial
from webapp2_auth.httpclient import AsyncHTTPClient
http = AsyncHTTPClient()
if args or kwargs:
async_callback = partial(async_callback, *args, **kwargs)
http.fetch(
url=oauth_request.url,
body=oauth_request.payload,
method=oauth_request.method,
headers=oauth_request.headers,
callback=async_callback,
deadline=10
)
else:
from google.appengine.api import urlfetch
try:
response = urlfetch.fetch(
url=oauth_request.url,
payload=oauth_request.payload,
method=oauth_request.method,
headers=oauth_request.headers,
deadline=10)
except urlfetch.DownloadError, e:
logging.exception(e)
response = None
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment