Skip to content

Instantly share code, notes, and snippets.

@epsy
Created September 23, 2013 00:58
Show Gist options
  • Save epsy/6665392 to your computer and use it in GitHub Desktop.
Save epsy/6665392 to your computer and use it in GitHub Desktop.
AAGID OpenID Gateway consumer example
from collections import MutableMapping
from openid.consumer import consumer, discover
from openid.store.filestore import FileOpenIDStore
from flask import request, redirect, session
class OidSessionWrapper(MutableMapping):
"""encodes and decodes python-openid-specific types to and from
json-serializable types"""
def __init__(self, session):
self.session = session
def __setitem__(self, key, val):
try:
func = getattr(self, 'serialize_' + key)
except AttributeError:
pass
else:
val = func(val)
self.session[key] = val
def __getitem__(self, key):
ret = self.session[key]
try:
func = getattr(self, 'unserialize_' + key)
except AttributeError:
return ret
else:
return func(ret)
def __delitem__(self, key):
del self.session[key]
def __iter__(self):
return iter(self.session)
def __len__(self):
return len(self.session)
def serialize__openid_consumer_last_token(self, obj):
return obj.__dict__
def unserialize__openid_consumer_last_token(self, serialized):
obj = discover.OpenIDServiceEndpoint()
obj.__dict__.update(serialized)
session_for_openid = OidSessionWrapper(session)
class AagidOpenidGateway(object):
def __init__(self, path, prefix, endpoint):
self.path = path
self.prefix = prefix
self.endpoint_url = endpoint
self.endpoint = discover.OpenIDServiceEndpoint.fromOPEndpointURL(
endpoint)
def make_consumer(self):
return consumer.Consumer(
session_for_openid,
FileOpenIDStore(self.path)
)
def request_auth(self, return_url):
cons = self.make_consumer()
authreq = cons.beginWithoutDiscovery(self.endpoint)
if authreq.shouldSendRedirect():
return redirect(authreq.redirectURL(
request.url_root, return_url))
else:
return authreq.htmlMarkup(request.url_root, return_url)
def check_return(self):
cons = self.make_consumer()
ret = cons.complete(request.args, request.base_url)
if ret.status == 'success':
return self.oid_to_gid(ret.identity_url)
return None
def oid_to_gid(self, oid):
if not oid.startswith(self.prefix):
raise ValueError(oid)
return oid[len(self.prefix):].strip('/')
#!/usr/bin/env python
import os
from flask import Flask, url_for
from aagidgw import AagidOpenidGateway
app = Flask(__name__)
gidgw = AagidOpenidGateway(
os.path.join(app.instance_path, 'oid'),
'http://auth.hashpickup.net/', 'http://auth.hashpickup.net/openid')
@app.route('/')
def front():
return (
'<form method="post" action="/try">'
'<input type="submit" name="submit" value="Login with '
'an Armagetron Advanced Global ID" />'
'</form>'
)
@app.route('/try', methods=['POST'])
def try_():
return gidgw.request_auth(url_for('return_point', _external=True))
@app.route('/return')
def return_point():
gid = gidgw.check_return()
if gid:
message = "Hello {0}!".format(gid)
else:
message = "Authentication cancelled :("
return '{0} <a href="/">Try again.</a>'.format(message)
if __name__ == '__main__':
app.debug = True
app.secret_key = os.urandom(24)
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment