Skip to content

Instantly share code, notes, and snippets.

@pr1001
Created August 14, 2009 19:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pr1001/168067 to your computer and use it in GitHub Desktop.
Save pr1001/168067 to your computer and use it in GitHub Desktop.
Django code to do a federated (OpenID + oAuth) authentication with Google Data APIs
# Module import statements
# Python
import urllib
import urlparse
import pickle
import logging
# App Engine
from google.appengine.ext import db
# Django
from django.http import HttpResponse, HttpResponseRedirect
from django.conf import settings
import django.core.urlresolvers
# gacounter
import gacounter.views
from gacounter.models import Person
# openidgae
import openidgae
import openidgae.views
import openidgae.store
from openid.consumer.consumer import Consumer
from openid.consumer import discover
# gdata
import gdata.auth
import gdata.service
import gdata.urlfetch
import gdata.alt
import gdata.alt.appengine
# our GDataService client
client = gdata.service.GDataService()
gdata.alt.appengine.run_on_appengine(client)
# View method declarations
def login(request):
openidgae.views.initOpenId()
response = HttpResponse()
openid = settings.GOOGLE_OPENID_DISCOVERY_POINT
if not openid:
return gacounter.views.index(request)
c = Consumer({}, openidgae.views.get_store())
try:
auth_request = c.begin(openid)
except discover.DiscoveryFailure, e:
logging.error('Error with begin on '+openid)
logging.error(str(e))
# return show_main_page(request, 'An error occured determining your server information. Please try again.')
return gacounter.views.index(request)
parts = list(urlparse.urlparse(openidgae.views.get_full_path(request)))
# logout URL with the leading "/" character removed
parts[2] = django.core.urlresolvers.reverse('authorization.views.login_succeeded')[1:]
continueUrl = request.GET.get('continue', '/')
# Sanitize
if continueUrl.find('//') >= 0 or not continueUrl.startswith('/'):
continueUrl = '/'
parts[4] = 'continue=%s' % urllib.quote_plus(continueUrl)
parts[5] = ''
return_to = urlparse.urlunparse(parts)
realm = urlparse.urlunparse(parts[0:2] + [''] * 4)
# save the session stuff
session = openidgae.get_session(request, response)
session.openid_stuff = pickle.dumps(c.session)
session.put()
# send the redirect! we use a meta because appengine bombs out
# sometimes with long redirect urls
redirect_url = auth_request.redirectURL(realm, return_to)
# add attribute exchange (ie get email) params here
redirect_url += "&openid.ns.ext1=" + urllib.quote_plus("http://openid.net/srv/ax/1.0")
redirect_url += "&openid.ext1.mode=fetch_request"
redirect_url += "&openid.ext1.required=email"
redirect_url += "&openid.ext1.type.email=" + urllib.quote_plus("http://schema.openid.net/contact/email")
# add oAuth params here
redirect_url += "&openid.ns.ext2=" + urllib.quote_plus(settings.GOOGLE_OPENID_OAUTH_URI)
redirect_url += "&openid.ext2.consumer=" + urllib.quote_plus(settings.GOOGLE_CONSUMER_KEY) # realm minus http:// portion
redirect_url += "&openid.ext2.scope=" + urllib.quote_plus(settings.GOOGLE_ANALYTICS_ACCOUNT_URI) + '+' + urllib.quote_plus(settings.GOOGLE_ANALYTICS_DATA_URI)
# logging.debug("bubblefoundry, redirect_url: %r" % redirect_url)
response.write(
"<html><head><meta http-equiv=\"refresh\" content=\"0;url=%s\"></head><body></body></html>"
% (redirect_url,))
return response
def login_succeeded(request):
openidgae.views.initOpenId()
response = HttpResponse()
# logging.info("bubblefoundry, login succeeded")
if request.method == 'GET':
args = openidgae.views.args_to_dict(request.GET)
url = 'http://'+request.META['HTTP_HOST']+django.core.urlresolvers.reverse('authorization.views.login_succeeded')
# logging.debug("bubblefoundry, url for redirect: %s" % url)
session = openidgae.get_session(request, response)
s = {}
try:
s = pickle.loads(str(session.openid_stuff))
except:
session.openid_stuff = None
session.put()
c = Consumer(s, openidgae.views.get_store())
auth_response = c.complete(args, url)
if auth_response.status == 'success':
# logging.info("bubblefoundry, got auth_response")
openid = auth_response.getDisplayIdentifier()
person = Person.all().filter('openid = ', openid).get()
if not person:
person = Person()
person.openid = openid
# we got an email address
if 'openid.ext1.value.email' in args:
person.email = db.Email(args['openid.ext1.value.email'])
# we got an oAuth signed request token!
if 'openid.ext2.request_token' in args:
# note secret = "" - this is essential, as values such as None and False cause errors
signed_request_token = gdata.auth.OAuthToken(key=args['openid.ext2.request_token'], secret="")
client.SetOAuthInputParameters(gdata.auth.OAuthSignatureMethod.HMAC_SHA1, settings.GOOGLE_CONSUMER_KEY, settings.GOOGLE_CONSUMER_SECRET)
access_token = client.UpgradeToOAuthAccessToken(signed_request_token)
# logging.info("bubblefoundry, access_token: %s" % str(access_token))
person.accessToken = str(access_token)
person.put()
# logging.info("bubblefoundry, person.access_token: %s" % person.accessToken)
s = openidgae.get_session(request, response)
s.person = person.key()
request.openidgae_logged_in_person = person
if s.url:
add = AddWebsiteSubmit() # where is this defined?
add.request = self.request
add.response = self.response
add.vote_for(s.url, person)
s.url = None
s.put()
continueUrl = request.GET.get('continue', '/')
# Sanitize
if continueUrl.find('//') >= 0 or not continueUrl.startswith('/'):
continueUrl = '/'
# logging.info("bubblefoundry, continueUrl: %s" % continueUrl)
return HttpResponseRedirect(continueUrl)
else:
# logging.debug("OpenID failed")
return openidgae.views.show_main_page(request, 'OpenID verification failed :(')
def logout(request):
return openidgae.views.LogoutSubmit(request)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment