Skip to content

Instantly share code, notes, and snippets.

@tony-landis
Created June 2, 2009 23:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save tony-landis/122674 to your computer and use it in GitHub Desktop.
Save tony-landis/122674 to your computer and use it in GitHub Desktop.
import logging
from pylons import config, request, response, session, tmpl_context as c
from pylons.controllers.util import abort, redirect_to, url_for
from pylons_openid.lib.base import BaseController, render
log = logging.getLogger(__name__)
import urllib2
import md5
import simplejson as json_
from pylons_openid.model import User, UserOpenId, meta
from sqlalchemy import func, or_
sa = meta.Session
class AuthController(BaseController):
def on_login(self):
"""Called on successful login"""
def on_logout(self, username):
"""Called on logout"""
def get_loginform(self, username, msg="Enter login information", from_page="/"):
c.username = username
c.alert = msg
c.from_page=from_page
return render("/login.html")
def login(self):
' normal account login '
username,password,from_page = [(request.params.get(key, None)) for key in["username","password","from_page"]]
if not username or not password:
return self.get_loginform("")
query = sa.query(User).filter(User.username==username).\
filter(User.password==md5.new(password).hexdigest())
if not query.count():
return self.get_loginform(username, "Invalid Credentials", from_page)
# set login date/ip
user = query.one()
user.ip = request.environ.get("X_FORWARDED_FOR", request.environ["REMOTE_ADDR"])
user.dateLogin = func.now()
user.sessionId = session.id
# update the session
session['user_id'] = int(user.id)
session['user_acl'] = str(user.acl)
session['user_login'] = user.username
session['user_name'] = user.name
session.save()
redirect_to('/root/index')
def logout(self):
username = session.get('user_id', None)
session['user_id'] = None
del session['user_id']
session.save()
redirect_to("/auth/login")
def rpx_token_url(self, *args, **kargs):
'token' in request.params or redirect_to(url_for(controller="auth", action="login"))
token = request.params['token']
# contact rpx for the details:
url = "https://rpxnow.com/api/v2/auth_info?token=%s&apiKey=%s" % (token, config.get('rpx_token'))
json = json_.loads(urllib2.urlopen(url).read())
if(json['stat'] == "ok"):
json = json["profile"]
user = None
openid = None
# check if openid user already in the db
openids = sa.query(UserOpenId).\
filter(or_(UserOpenId.verifiedEmail == json['verifiedEmail'], UserOpenId.preferredUsername == json['preferredUsername'])).\
filter(UserOpenId.providerName == json['providerName'])
if(openids.count()):
openid = openids.one()
user = openid.user
else:
# no openid record exists, check if old user exists with verifiedEmail...
query = sa.query(User).filter(User.username == json['verifiedEmail'])
if(query.count() == 1):
# one exact match
user = query.one()
# create user?
if user == None:
password = md5.new(str(json))
user = User(username=json['verifiedEmail'], password=password.hexdigest(), acl='Cusomer', name=json['displayName'])
sa.save(user)
sa.flush()
# create openid
if openid == None:
openid = UserOpenId(
verifiedEmail=json['verifiedEmail'],
displayName=json['displayName'],
preferredUsername=json['preferredUsername'],
providerName=json['providerName'],
identifier=json['identifier'],
email=json['email'],
user_id = user.id)
sa.save(openid)
sa.flush()
# set login date/ip
user.ip = request.environ.get("X_FORWARDED_FOR", request.environ["REMOTE_ADDR"])
user.dateLogin = func.now()
user.sessionId = session.id
# update logged in status
session['user_id'] = int(user.id)
session['user_acl'] = user.acl
session['user_login'] = user.username
session['user_name'] = user.name
session.save()
redirect_to("/root/index")
else:
# something bad happened
redirect_to(url_for(controller='auth', action='login'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment