Created
June 20, 2011 18:15
-
-
Save gorakhargosh/1036192 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (C) 2009 Facebook | |
# Copyright (C) 2010, 2011 Tipfy.org | |
# Copyright (C) 2011 Yesudeep Mangalapilly <yesudeep@gmail.com> | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); you may | |
# not use this file except in compliance with the License. You may obtain | |
# a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | |
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | |
# License for the specific language governing permissions and limitations | |
# under the License. | |
# Keep this as close to the Facebook Tornado code as possible. | |
# Any changes upstream will be easier to merge in here as a result. | |
""" | |
Implementations of various third-party authentication schemes. | |
All the classes in this file are class mixins designed to be used | |
with the webapp-improved RequestHandler class. The primary methods for each | |
service are authenticate_redirect(), authorize_redirect(), and | |
get_authenticated_user(). The former should be called to redirect the user to, | |
e.g., the OpenID authentication page on the third party service, and the latter | |
should be called upon return to get the user data from the data returned by the | |
third party service. | |
They all take slightly different arguments due to the fact that all these | |
services implementat authentication and authorization slightly differently. | |
See the individual service classes below for complete documentation. | |
Example usage for Google OpenID:: | |
class GoogleHandler(RequestHandler, GoogleMixin): | |
def get(self): | |
if self.request.get('openid.mode', None): | |
self.get_authenticated_user(self._on_auth) | |
return | |
self.authenticate_redirect() | |
def _on_auth(self, user): | |
if not user: | |
raise ValueError("Google Authentication failed.") | |
# Save the user with ... self.session_store.set_secure_cookie() | |
""" | |
#import base64 | |
import binascii | |
import cgi | |
import functools | |
import hashlib | |
import hmac | |
import logging | |
import time | |
import urllib | |
import urlparse | |
import uuid | |
from google.appengine.api import urlfetch | |
class OpenIdMixin(object): | |
""" | |
Abstract implementation of OpenID and Attribute Exchange. | |
See GoogleMixin for example implementation. | |
""" | |
# Implement this in subclasses. | |
#_OPENID_ENDPOINT = None | |
def authenticate_redirect(self, callback_uri=None, ax_attrs=None): | |
""" | |
Redirects to the authentication URL for this service. | |
After authentication, the service will redirect back to the given | |
callback URI. | |
We request the given attributes for the authenticated user by default | |
(name, email, language, and username). If you don't need all those | |
attributes for your app, you can request fewer with the ax_attrs keyword | |
argument. | |
:param callback_uri: | |
The URL to redirect to after authentication. | |
:param ax_attrs: | |
List of Attribute Exchange attributes to be fetched. | |
:returns: | |
None | |
""" | |
ax_attrs = ax_attrs or ["name", "email", "language", "username"] | |
request_uri = self.request.path | |
callback_uri = callback_uri or request_uri | |
args = self._openid_args(callback_uri, ax_attrs=ax_attrs) | |
self.redirect(self._OPENID_ENDPOINT + "?" + urllib.urlencode(args)) | |
def get_authenticated_user(self, callback): | |
""" | |
Fetches the authenticated user data upon redirect. | |
This method should be called by the handler that handles the callback | |
URL to which the service redirects when the authenticate_redirect() | |
or authorize_redirect() methods are called. | |
:param callback: | |
A function that is called after the authentication attempt. It is | |
called passing a dictionary with the requested user attributes or | |
None if the authentication failed. | |
""" | |
request_arguments = self.request.params | |
# Verify the OpenID response via direct request to the OP | |
args = dict((k, v[-1]) for k, v in request_arguments.iteritems()) | |
args["openid.mode"] = u"check_authentication" | |
url = self._OPENID_ENDPOINT | |
try: | |
response = urlfetch.fetch(url, deadline=10, method=urlfetch.POST, | |
payload=urllib.urlencode(args)) | |
if response.status_code < 200 or response.status_code >= 300: | |
logging.warning("Invalid OpenID response (%d): %s", | |
response.status_code, response.content) | |
else: | |
self._on_authentication_verified(callback, response) | |
return | |
except urlfetch.DownloadError, e: | |
logging.exception(e) | |
self._on_authentication_verified(callback, None) | |
def _openid_args(self, callback_uri, ax_attrs=None, oauth_scope=None): | |
""" | |
Builds and returns the OpenID arguments used in the authentication request. | |
:param callback_uri: | |
The URL to redirect to after authentication. | |
:param ax_attrs: | |
List of Attribute Exchange attributes to be fetched. | |
:param oauth_scope: | |
OAuth scope. | |
:returns: | |
A dictionary of arguments for the authentication URL. | |
""" | |
ax_attrs = ax_attrs or [] | |
url = urlparse.urljoin(self.request.url, callback_uri) | |
request_host = self.request.host | |
request_protocol = self.request.scheme | |
args = { | |
"openid.ns": "http://specs.openid.net/auth/2.0", | |
"openid.claimed_id": | |
"http://specs.openid.net/auth/2.0/identifier_select", | |
"openid.identity": | |
"http://specs.openid.net/auth/2.0/identifier_select", | |
"openid.return_to": url, | |
"openid.realm": request_protocol + "://" + request_host + "/", | |
"openid.mode": "checkid_setup", | |
} | |
if ax_attrs: | |
args.update({ | |
"openid.ns.ax": "http://openid.net/srv/ax/1.0", | |
"openid.ax.mode": "fetch_request", | |
}) | |
ax_attrs = set(ax_attrs) | |
required = [] | |
if "name" in ax_attrs: | |
ax_attrs -= set(["name", "firstname", "fullname", "lastname"]) | |
required += ["firstname", "fullname", "lastname"] | |
args.update({ | |
"openid.ax.type.firstname": | |
"http://axschema.org/namePerson/first", | |
"openid.ax.type.fullname": | |
"http://axschema.org/namePerson", | |
"openid.ax.type.lastname": | |
"http://axschema.org/namePerson/last", | |
}) | |
known_attrs = { | |
"email": "http://axschema.org/contact/email", | |
"language": "http://axschema.org/pref/language", | |
"username": "http://axschema.org/namePerson/friendly", | |
} | |
for name in ax_attrs: | |
args["openid.ax.type." + name] = known_attrs[name] | |
required.append(name) | |
args["openid.ax.required"] = ",".join(required) | |
if oauth_scope: | |
args.update({ | |
"openid.ns.oauth": | |
"http://specs.openid.net/extensions/oauth/1.0", | |
"openid.oauth.consumer": request_host.split(":")[0], | |
"openid.oauth.scope": oauth_scope, | |
}) | |
return args | |
def _on_authentication_verified(self, callback, response): | |
""" | |
Called after the authentication attempt. It calls the callback function | |
set when the authentication process started, passing a dictionary of | |
user data if the authentication was successful or None if it failed. | |
:param callback: | |
A function that is called after the authentication attempt | |
""" | |
if not response: | |
logging.warning("Missing OpenID response.") | |
callback(None) | |
return | |
elif response.status_code < 200 or response.status_code >= 300 or \ | |
u"is_value:true" not in response.content: | |
logging.warning("Invalid OpenID response (%d): %s", | |
response.status_code, response.content) | |
callback(None) | |
return | |
request_arguments = self.request.params | |
claimed_id = self.request.get("openid.claimed_id", u"") | |
# Make sure we got back at least an email from Attribute Exchange. | |
ax_ns = None | |
for name, values in request_arguments.iteritems(): | |
if name.startswith("openid.ns.") and\ | |
values[-1] == u"http://openid.net/srv/ax/1.0": | |
ax_ns = name[10:] | |
break | |
email = self._get_ax_arg("http://axschema.org/contact/email", ax_ns) | |
name = self._get_ax_arg("http://axschema.org/namePerson", ax_ns) | |
first_name = self._get_ax_arg("http://axschema.org/namePerson/first", | |
ax_ns) | |
last_name = self._get_ax_arg("http://axschema.org/namePerson/last", | |
ax_ns) | |
username = self._get_ax_arg("http://axschema.org/namePerson/friendly", | |
ax_ns) | |
locale = self._get_ax_arg("http://axschema.org/pref/language", | |
ax_ns).lower() | |
user = dict() | |
name_parts = [] | |
if first_name: | |
user["first_name"] = first_name | |
name_parts.append(first_name) | |
if last_name: | |
user["last_name"] = last_name | |
name_parts.append(last_name) | |
if name: | |
user["name"] = name | |
elif name_parts: | |
user["name"] = u" ".join(name_parts) | |
elif email: | |
user["name"] = email.split("@")[0] | |
if email: user["email"] = email | |
if locale: user["locale"] = locale | |
if username: user["username"] = username | |
# Get the claimed ID. Not in facebook code. Borrowed from Tipfy. | |
user["claimed_id"] = claimed_id | |
callback(user) | |
def _get_ax_arg(self, uri, ax_ns): | |
""" | |
Returns an Attribute Exchange value from the request. | |
:param uri: | |
Attribute Exchange URI. | |
:param ax_ns: | |
Attribute Exchange namespace. | |
:returns: | |
The Attribute Exchange value, if found in the request. | |
""" | |
request_arguments = self.request.params | |
if not ax_ns: | |
return u"" | |
prefix = "openid." + ax_ns + ".type." | |
ax_name = None | |
for name, values in request_arguments.iteritems(): | |
if values[-1] == uri and name.startswith(prefix): | |
part = name[len(prefix):] | |
ax_name = "openid." + ax_ns + ".value." + part | |
break | |
if not ax_name: | |
return u"" | |
return self.request.get(ax_name, u"") | |
class OAuthMixin(object): | |
""" | |
Abstract implementation of OAuth. | |
See TwitterMixin and FriendFeedMixin for example implementations. | |
""" | |
# Define these in the subclasses. | |
#_OAUTH_VERSION = "1.0a" | |
#_OAUTH_AUTHORIZE_URL = None | |
#_OAUTH_ACCESS_TOKEN_URL = None | |
#_OAUTH_REQUEST_TOKEN_URL = None | |
#_OAUTH_NO_CALLBACKS = False | |
def authorize_redirect(self, callback_uri=None, extra_params=None): | |
""" | |
Redirects the user to obtain OAuth authorization for this service. | |
Twitter and FriendFeed both require that you register a callback | |
URL with your application. You should call this method to log the user | |
in, and then call get_authenticated_user() in the handler you registered | |
as your callback URL to complete the authorization process. | |
This method sets a cookie called _oauth_request_token which is | |
subsequently used (and cleared) in get_authenticated_user for security | |
purposes. | |
:param callback_uri: | |
Callback URI. | |
:param extra_params: | |
Extra parameters | |
""" | |
if callback_uri and getattr(self, "_OAUTH_NO_CALLBACKS", False): | |
raise Exception("This service does not support oauth_callback") | |
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a": | |
url = self._oauth_request_token_url(callback_uri=callback_uri, | |
extra_params=extra_params) | |
else: | |
url = self._oauth_request_token_url() | |
try: | |
response = urlfetch.fetch(url, deadline=10) | |
except urlfetch.DownloadError, e: | |
logging.exception(e) | |
response = None | |
self._on_request_token(self._OAUTH_AUTHORIZE_URL, callback_uri, | |
response) | |
def get_authenticated_user(self, callback): | |
""" | |
Gets the OAuth authorized user and access token on callback. | |
This method should be called from the handler for your registered | |
OAuth callback URL to complete the registration process. We call | |
callback with the authenticated user, which in addition to standard | |
attributes like 'name' includes the 'access_key' attribute, which | |
contains the OAuth access you can use to make authorized requests | |
to this service on behalf of the user. | |
:param callback: | |
The callback that will be called upon successful authorization. | |
""" | |
request_key = self.request.get("oauth_token") | |
oauth_verifier = self.request.get("oauth_verifier", None) | |
#request_cookie = self.request.cookies.get("_oauth_request_token") | |
request_cookie = self.session_store.get_secure_cookie("_oauth_request_token") | |
if not request_cookie: | |
logging.warning("Missing OAuth request token cookie") | |
callback(None) | |
return | |
#self.session_store.delete_cookie("_oauth_request_token") | |
self.response.delete_cookie("_oauth_request_token") | |
#cookie_key, cookie_secret = [base64.b64decode(i) for i in | |
# request_cookie.split("|")] | |
cookie_key = request_cookie["key"] | |
cookie_secret = request_cookie["secret"] | |
if cookie_key != request_key: | |
logging.warning("Request token does not match cookie") | |
callback(None) | |
return | |
token = dict(key=cookie_key, secret=cookie_secret) | |
if oauth_verifier: | |
token["verifier"] = oauth_verifier | |
try: | |
url = self._oauth_access_token_url(token) | |
response = urlfetch.fetch(url, deadline=10) | |
except urlfetch.DownloadError, e: | |
logging.exception(e) | |
response = None | |
self._on_access_token(callback, response) | |
def _oauth_request_token_url(self, callback_uri=None, extra_params=None): | |
""" | |
Obtains the OAuth request token URL along with the query parameters | |
filled in. | |
:param callback_uri: | |
Where the service will redirect after returning a request token. | |
:param extra_params: | |
Extra query parameters you want to send along with the request. | |
:returns: | |
A properly encoded request token URL. | |
""" | |
request_url = self.request.url | |
oauth_version = getattr(self, "_OAUTH_VERSION", "1.0a") | |
consumer_token = self._oauth_consumer_token() | |
url = self._OAUTH_REQUEST_TOKEN_URL | |
args = { | |
"oauth_consumer_key": consumer_token["key"], | |
"oauth_signature_method": "HMAC-SHA1", | |
"oauth_timestamp": _oauth_generate_timestamp(), | |
"oauth_nonce": _oauth_generate_nonce(), | |
"oauth_version": oauth_version, | |
} | |
if oauth_version == "1.0a": | |
if callback_uri: | |
args["oauth_callback"] = urlparse.urljoin(request_url, | |
callback_uri) | |
if extra_params: | |
args.update(extra_params) | |
signature = _oauth10a_signature(consumer_token, "GET", url, args) | |
else: | |
signature = _oauth_signature(consumer_token, "GET", url, args) | |
args["oauth_signature"] = signature | |
return url + "?" + urllib.urlencode(args) | |
def _on_request_token(self, authorize_url, callback_uri, response): | |
""" | |
When a request token response is received. | |
:param authorize_url: | |
The authorization URL to redirect to obtain an access token. | |
:param callback_uri: | |
The callback URI. | |
:param response: | |
OAuth response. | |
:returns: | |
None | |
""" | |
request_url = self.request.url | |
if not response: | |
logging.warning("Could not get OAuth request token.") | |
self.abort(500) | |
elif response.status_code < 200 or response.status_code >= 300: | |
logging.warning( | |
"Bad OAuth response when requesting a token (%d): %s", | |
response.status_code, response.content) | |
self.abort(500) | |
request_token = _oauth_parse_response(response.content) | |
#data = "|".join([ | |
# base64.b64encode(request_token["key"]), | |
# base64.b64encode(request_token["secret"]), | |
# ]) | |
data = { | |
"key": request_token["key"], | |
"secret": request_token["secret"], | |
} | |
self.session_store.set_secure_cookie("_oauth_request_token", data) | |
args = dict(oauth_token=request_token["key"]) | |
if callback_uri: | |
args["oauth_callback"] = urlparse.urljoin(request_url, callback_uri) | |
self.redirect(authorize_url + "?" + urllib.urlencode(args)) | |
def _oauth_access_token_url(self, request_token): | |
""" | |
Obtains the OAuth Access token URL with query parameters filled in. | |
:param request_token: | |
The OAuth request token obtained in a previous step. | |
:returns: | |
Properly encoded access token URL. | |
""" | |
oauth_version = getattr(self, "_OAUTH_VERSION", "1.0a") | |
consumer_token = self._oauth_consumer_token() | |
url = self._OAUTH_ACCESS_TOKEN_URL | |
args = { | |
"oauth_consumer_key": consumer_token["key"], | |
"oauth_token": request_token["key"], | |
"oauth_signature_method": "HMAC-SHA1", | |
"oauth_timestamp": _oauth_generate_timestamp(), | |
"oauth_nonce": _oauth_generate_nonce(), | |
"oauth_version": oauth_version, | |
} | |
if "verifier" in request_token: | |
args["oauth_verifier"] = request_token["verifier"] | |
if oauth_version == "1.0a": | |
signature = _oauth10a_signature(consumer_token, "GET", url, args, | |
request_token) | |
else: | |
signature = _oauth_signature(consumer_token, "GET", url, args, | |
request_token) | |
args["oauth_signature"] = signature | |
return url + "?" + urllib.urlencode(args) | |
def _on_access_token(self, callback, response): | |
if not response: | |
logging.warning("Could not fetch access token") | |
#self.abort(500) | |
callback(None) | |
return | |
elif response.status_code < 200 or response.status_code >= 300: | |
logging.warning( | |
"Bad OAuth response trying to get access token (%d): %s", | |
response.status_code, response.content) | |
#self.abort(500) | |
callback(None) | |
return | |
access_token = _oauth_parse_response(response.content) | |
user = self._oauth_get_user(access_token, | |
functools.partial(self._on_oauth_get_user, | |
access_token, callback)) | |
def _oauth_consumer_token(self): | |
""" | |
Returns the consumer token for this service handler. | |
Needs to be overridden. | |
""" | |
raise NotImplementedError() | |
def _oauth_get_user(self, access_token, callback): | |
""" | |
Obtains the user data from OAuth access token. | |
Needs to be overridden. | |
:param access_token: | |
OAuth access token | |
:param callback: | |
Callback that will be given the data. | |
""" | |
raise NotImplementedError() | |
def _on_oauth_get_user(self, access_token, callback, user): | |
""" | |
:param access_token: | |
Access token. | |
:param callback: | |
:param user: | |
""" | |
if not user: | |
callback(None) | |
return | |
user["access_token"] = access_token | |
callback(user) | |
def _oauth_request_parameters(self, url, access_token, parameters=None, | |
method="GET"): | |
""" | |
Returns the OAuth parameters as a dict for the given request. | |
Parameters should include all POST arguments and query string arguments | |
that will be sent with the request. | |
:param url: | |
Request URL. | |
:param access_token: | |
OAuth Access Token | |
:param parameters: | |
A dictionary of query parameters. | |
:param method: | |
HTTP Method. | |
""" | |
parameters = parameters or {} | |
oauth_version = getattr(self, "_OAUTH_VERSION", "1.0a") | |
consumer_token = self._oauth_consumer_token() | |
base_args = { | |
"oauth_consumer_key": consumer_token["key"], | |
"oauth_token": access_token["key"], | |
"oauth_signature_method": "HMAC-SHA1", | |
"oauth_timestamp": _oauth_generate_timestamp(), | |
"oauth_nonce": _oauth_generate_nonce(), | |
"oauth_version": oauth_version, | |
} | |
args = {} | |
args.update(base_args) | |
args.update(parameters) | |
if oauth_version == "1.0a": | |
signature = _oauth10a_signature(consumer_token, method, url, args, | |
access_token) | |
else: | |
signature = _oauth_signature(consumer_token, method, url, args, | |
access_token) | |
base_args["oauth_signature"] = signature | |
return base_args | |
class OAuth2Mixin(object): | |
""" | |
Abstract implementation of OAuth v 2. | |
""" | |
def authorize_redirect(self, redirect_uri=None, client_id=None, | |
client_secret=None, extra_params=None): | |
""" | |
Redirects the user to obtain OAuth authorization for this service. | |
Some providers require that you register a callback URL with your | |
application. You should call this method to log the user in, and then | |
call get_authenticated_user() in the handler you registered as your | |
callback URL to complete the authorization process. | |
:param redirect_uri: | |
:param client_id: | |
:param client_secret: | |
:param extra_params: | |
""" | |
args = { | |
"redirect_uri": redirect_uri, | |
"client_id": client_id, | |
} | |
if extra_params: | |
args.update(extra_params) | |
self.redirect(_url_concat(self._OAUTH_AUTHORIZE_URL, args)) | |
def _oauth_request_token_url(self, redirect_uri=None, client_id=None, | |
client_secret=None, code=None, | |
extra_params=None): | |
""" | |
Obtains the OAuth request token URL. | |
:param redirect_uri: | |
:param client_id: | |
:param client_secret: | |
:param code: | |
:param extra_params: | |
:returns: | |
Properly encoded request token URL. | |
""" | |
url = self._OAUTH_REQUEST_TOKEN_URL | |
args = { | |
"redirect_uri": redirect_uri, | |
"code": code, | |
"client_id": client_id, | |
"client_secret": client_secret, | |
} | |
if extra_params: | |
args.update(extra_params) | |
return _url_concat(url, args) | |
def _oauth_signature(consumer_token, method, url, parameters=None, token=None): | |
""" | |
Calculates the HMAC-SHA1 OAuth signature for the given request. | |
See http://oauth.net/core/1.0/#signing_process | |
""" | |
parameters = parameters or {} | |
base_string = _oauth_get_signature_base_string(url, method, parameters) | |
key_elems = [consumer_token["secret"]] | |
key_elems.append(token["secret"] if token else "") | |
key = "&".join(key_elems) | |
return _oauth_hmac_sha1_base64_digest(key, base_string) | |
def _oauth10a_signature(consumer_token, method, url, parameters=None, | |
token=None): | |
""" | |
Calculates the HMAC-SHA1 OAuth signature for the given request. | |
See http://oauth.net/core/1.0a/#signing_process | |
""" | |
parameters = parameters or {} | |
base_string = _oauth_get_signature_base_string(url, method, parameters) | |
key_elems = [_oauth_escape(consumer_token["secret"])] | |
key_elems.append(_oauth_escape(token["secret"]) if token else "") | |
key = "&".join(key_elems) | |
return _oauth_hmac_sha1_base64_digest(key, base_string) | |
def _oauth_hmac_sha1_base64_digest(key, base_string): | |
""" | |
Calculates a base-64 encoded HMAC-SHA1 digest for the given | |
OAuth key and base string. | |
:param key: | |
Key query string. | |
:param base_string: | |
Base query string. | |
:returns: | |
Base64-encoded HMAC-SHA1 digest. | |
""" | |
hashed = hmac.new(key, base_string, hashlib.sha1) | |
return binascii.b2a_base64(hashed.digest())[:-1] | |
def _oauth_get_signature_base_string(url, method, parameters): | |
""" | |
Calculates an OAuth base string from the given URL, method, and parameters. | |
:param url: | |
The URL. | |
:param method: | |
HTTP Method. | |
:param parameters: | |
Query string parameters. | |
:returns: | |
OAuth request base string. | |
""" | |
normalized_url = _oauth_get_normalized_url(url) | |
query_string = "&".join("%s=%s" % (k, _oauth_escape(str(v))) | |
for k, v in sorted(parameters.items())) | |
base_elems = [method.upper(), normalized_url, query_string] | |
base_string = "&".join(_oauth_escape(e) for e in base_elems) | |
return base_string | |
def _oauth_get_normalized_url(url): | |
""" | |
Normalizes a URL that will be used in the oauth signature. | |
Doctests:: | |
>>> | |
:param url: | |
The URL to normalize. | |
:returns: | |
Normalized URL. | |
""" | |
parts = urlparse.urlparse(url) | |
scheme, netloc, path = parts[:3] | |
normalized_url = scheme.lower() + "://" + netloc.lower() + path | |
return normalized_url | |
def _oauth_escape(val): | |
""" | |
Escapes the value of a query string parameter according to the OAuth spec. | |
:param val: | |
Query string parameter value to escape. | |
:returns: | |
String representing escaped value. | |
""" | |
if isinstance(val, unicode): | |
val = val.encode("utf-8") | |
return _oauth_quote(val) | |
def _oauth_quote(val, safe="~"): | |
""" | |
Quotes a URL value as per the OAuth spec. | |
:param val: | |
The value to quote. | |
:param safe: | |
The safe characters. | |
:returns: | |
Quoted value. | |
""" | |
return urllib.quote(val, safe=safe) | |
def _oauth_generate_nonce(): | |
""" | |
Calculates an OAuth nonce. | |
:returns: | |
OAuth nonce string. | |
""" | |
return binascii.b2a_hex(uuid.uuid4().bytes) | |
def _oauth_generate_timestamp(): | |
""" | |
Returns a string timestamp value according to the OAuth spec. | |
:returns: | |
OAuth timestamp integer as string. | |
""" | |
return str(int(time.time())) | |
def _oauth_parse_response(body): | |
""" | |
Parses token out of the OAuth response body. | |
:param body: | |
The OAuth response body. | |
:returns: | |
A token as a dictionary. | |
""" | |
p = cgi.parse_qs(body, keep_blank_values=False) | |
token = dict(key=p["oauth_token"][0], secret=p["oauth_token_secret"][0]) | |
special = ("oauth_token", "oauth_token_secret") | |
token.update((k, p[k][0]) for k in p if k not in special) | |
return token | |
def _url_concat(url, args): | |
"""Concatenate url and argument dictionary regardless of whether | |
url has existing query parameters. | |
>>> url_concat("http://example.com/foo?a=b", dict(c="d")) | |
'http://example.com/foo?a=b&c=d' | |
""" | |
if not args: | |
return url | |
if url[-1] not in ('?', '&'): | |
url += '&' if ('?' in url) else '?' | |
return url + urllib.urlencode(args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment