tony-landis (owner)

Revisions

gist: 122674 Download_button fork
public
Public Clone URL: git://gist.github.com/122674.git
Embed All Files: show embed
Python #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import logging
from pylons import config, request, response, session, tmpl_context as c
from pylons.controllers.util import abort, redirect_to, url_for
from pylons_openid.lib.base import BaseController, render
log = logging.getLogger(__name__)
 
import urllib2
import md5
import simplejson as json_
 
from pylons_openid.model import User, UserOpenId, meta
from sqlalchemy import func, or_
 
sa = meta.Session
 
class AuthController(BaseController):
 
def on_login(self):
"""Called on successful login"""
 
def on_logout(self, username):
"""Called on logout"""
 
def get_loginform(self, username, msg="Enter login information", from_page="/"):
c.username = username
c.alert = msg
c.from_page=from_page
return render("/login.html")
    
def login(self):
' normal account login '
username,password,from_page = [(request.params.get(key, None)) for key in["username","password","from_page"]]
if not username or not password:
return self.get_loginform("")
query = sa.query(User).filter(User.username==username).\
filter(User.password==md5.new(password).hexdigest())
if not query.count():
return self.get_loginform(username, "Invalid Credentials", from_page)
 
# set login date/ip
user = query.one()
user.ip = request.environ.get("X_FORWARDED_FOR", request.environ["REMOTE_ADDR"])
user.dateLogin = func.now()
user.sessionId = session.id
 
# update the session
session['user_id'] = int(user.id)
session['user_acl'] = str(user.acl)
session['user_login'] = user.username
session['user_name'] = user.name
session.save()
redirect_to('/root/index')
 
def logout(self):
username = session.get('user_id', None)
session['user_id'] = None
del session['user_id']
session.save()
redirect_to("/auth/login")
 
def rpx_token_url(self, *args, **kargs):
'token' in request.params or redirect_to(url_for(controller="auth", action="login"))
token = request.params['token']
 
# contact rpx for the details:
url = "https://rpxnow.com/api/v2/auth_info?token=%s&apiKey=%s" % (token, config.get('rpx_token'))
json = json_.loads(urllib2.urlopen(url).read())
if(json['stat'] == "ok"):
json = json["profile"]
user = None
openid = None
 
# check if openid user already in the db
openids = sa.query(UserOpenId).\
filter(or_(UserOpenId.verifiedEmail == json['verifiedEmail'], UserOpenId.preferredUsername == json['preferredUsername'])).\
filter(UserOpenId.providerName == json['providerName'])
if(openids.count()):
openid = openids.one()
user = openid.user
else:
# no openid record exists, check if old user exists with verifiedEmail...
query = sa.query(User).filter(User.username == json['verifiedEmail'])
if(query.count() == 1):
# one exact match
user = query.one()
 
# create user?
if user == None:
password = md5.new(str(json))
user = User(username=json['verifiedEmail'], password=password.hexdigest(), acl='Cusomer', name=json['displayName'])
sa.save(user)
sa.flush()
 
# create openid
if openid == None:
openid = UserOpenId(
verifiedEmail=json['verifiedEmail'],
displayName=json['displayName'],
preferredUsername=json['preferredUsername'],
providerName=json['providerName'],
identifier=json['identifier'],
email=json['email'],
user_id = user.id)
sa.save(openid)
sa.flush()
 
# set login date/ip
user.ip = request.environ.get("X_FORWARDED_FOR", request.environ["REMOTE_ADDR"])
user.dateLogin = func.now()
user.sessionId = session.id
 
# update logged in status
session['user_id'] = int(user.id)
session['user_acl'] = user.acl
session['user_login'] = user.username
session['user_name'] = user.name
session.save()
 
redirect_to("/root/index")
 
else:
# something bad happened
redirect_to(url_for(controller='auth', action='login'))