Created
September 23, 2013 00:58
-
-
Save epsy/6665392 to your computer and use it in GitHub Desktop.
AAGID OpenID Gateway consumer example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import MutableMapping | |
from openid.consumer import consumer, discover | |
from openid.store.filestore import FileOpenIDStore | |
from flask import request, redirect, session | |
class OidSessionWrapper(MutableMapping): | |
"""encodes and decodes python-openid-specific types to and from | |
json-serializable types""" | |
def __init__(self, session): | |
self.session = session | |
def __setitem__(self, key, val): | |
try: | |
func = getattr(self, 'serialize_' + key) | |
except AttributeError: | |
pass | |
else: | |
val = func(val) | |
self.session[key] = val | |
def __getitem__(self, key): | |
ret = self.session[key] | |
try: | |
func = getattr(self, 'unserialize_' + key) | |
except AttributeError: | |
return ret | |
else: | |
return func(ret) | |
def __delitem__(self, key): | |
del self.session[key] | |
def __iter__(self): | |
return iter(self.session) | |
def __len__(self): | |
return len(self.session) | |
def serialize__openid_consumer_last_token(self, obj): | |
return obj.__dict__ | |
def unserialize__openid_consumer_last_token(self, serialized): | |
obj = discover.OpenIDServiceEndpoint() | |
obj.__dict__.update(serialized) | |
session_for_openid = OidSessionWrapper(session) | |
class AagidOpenidGateway(object): | |
def __init__(self, path, prefix, endpoint): | |
self.path = path | |
self.prefix = prefix | |
self.endpoint_url = endpoint | |
self.endpoint = discover.OpenIDServiceEndpoint.fromOPEndpointURL( | |
endpoint) | |
def make_consumer(self): | |
return consumer.Consumer( | |
session_for_openid, | |
FileOpenIDStore(self.path) | |
) | |
def request_auth(self, return_url): | |
cons = self.make_consumer() | |
authreq = cons.beginWithoutDiscovery(self.endpoint) | |
if authreq.shouldSendRedirect(): | |
return redirect(authreq.redirectURL( | |
request.url_root, return_url)) | |
else: | |
return authreq.htmlMarkup(request.url_root, return_url) | |
def check_return(self): | |
cons = self.make_consumer() | |
ret = cons.complete(request.args, request.base_url) | |
if ret.status == 'success': | |
return self.oid_to_gid(ret.identity_url) | |
return None | |
def oid_to_gid(self, oid): | |
if not oid.startswith(self.prefix): | |
raise ValueError(oid) | |
return oid[len(self.prefix):].strip('/') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
from flask import Flask, url_for | |
from aagidgw import AagidOpenidGateway | |
app = Flask(__name__) | |
gidgw = AagidOpenidGateway( | |
os.path.join(app.instance_path, 'oid'), | |
'http://auth.hashpickup.net/', 'http://auth.hashpickup.net/openid') | |
@app.route('/') | |
def front(): | |
return ( | |
'<form method="post" action="/try">' | |
'<input type="submit" name="submit" value="Login with ' | |
'an Armagetron Advanced Global ID" />' | |
'</form>' | |
) | |
@app.route('/try', methods=['POST']) | |
def try_(): | |
return gidgw.request_auth(url_for('return_point', _external=True)) | |
@app.route('/return') | |
def return_point(): | |
gid = gidgw.check_return() | |
if gid: | |
message = "Hello {0}!".format(gid) | |
else: | |
message = "Authentication cancelled :(" | |
return '{0} <a href="/">Try again.</a>'.format(message) | |
if __name__ == '__main__': | |
app.debug = True | |
app.secret_key = os.urandom(24) | |
app.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment