Skip to content

Instantly share code, notes, and snippets.

@dpfoose
Created June 30, 2020 19:15
Show Gist options
  • Save dpfoose/b4e23cf17533fc41ec40cb50041ee89c to your computer and use it in GitHub Desktop.
Save dpfoose/b4e23cf17533fc41ec40cb50041ee89c to your computer and use it in GitHub Desktop.
import os
ADMIN_GROUP = os.environ['ADMIN_GROUP']
AUTHORIZED_GROUP = os.environ['AUTHORIZED_GROUP']
# LDAP connection settings
LDAP_URI = os.environ['LDAP_URI']
LDAP_BASE_DN = os.environ['LDAP_BASE_DN']
LDAP_USER_BASE_DN = os.environ['LDAP_USER_BASE_DN']
LDAP_GROUP_BASE_DN = os.environ['LDAP_GROUP_BASE_DN']
LDAP_USER_ID = os.environ['LDAP_USER_ID']
LDAP_GROUP_ID = os.environ['LDAP_GROUP_ID']
from flask_login import LoginManager
import datetime
from .ldap_user_model import get_user
import jwt
import ldap_flask_login_config as config
class LoginError(Exception):
def __init__(self, message, redirect_url=None):
super(LoginError, self).__init__(message)
self.redirect_url = redirect_url
login_manager = LoginManager()
login_manager.login_view = 'browser.browser_login'
login_manager.blueprint_login_views = {
'api': 'api.unauthorized'
}
@login_manager.user_loader
def user_loader(user_id):
user = get_user(user_id)
if user.in_group(config.AUTHORIZED_GROUP):
return user
return
@login_manager.request_loader
def request_loader(req):
if 'Authorization' in req.headers:
auth_header = req.headers.get('Authorization')
token = auth_header.split(' ')[1]
user_data = jwt.decode(token, config.SECRET, algorithms='HS256')
print(user_data)
user = get_user(user_data['uid'][0])
print(user.to_dict())
return user
def get_jwt(username, password):
user = get_user(username)
if user.check_password(password):
user_data = {'uid': user.uid, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)}
return jwt.encode(user_data, config.SECRET, algorithm='HS256').decode('utf-8')
raise LoginError('Incorrect username/password.')
def authenticate_user(req):
print('authenticate_user')
username = req.form.get('username')
password = req.form.get('password')
if username is None and password is None:
print('loading json')
req_json = req.get_json(force=True)
username = req_json['username']
password = req_json['password']
user = get_user(username)
if user.check_password(password):
return user
raise LoginError('Incorrect username/password.')
import ldap
import ldap_flask_login_config as config
def get_user_dn(uid, full=True):
dn = f'{config.LDAP_USER_ID}={uid}'
if full:
dn += f',{config.LDAP_USER_BASE_DN}'
return dn
def get_group_dn(gid, full=True):
dn = f'{config.LDAP_GROUP_ID}={gid}'
if full:
dn += f',{config.LDAP_GROUP_BASE_DN}'
return dn
def resolve_gid_number(gid_number):
connection = ldap.initialize(config.LDAP_URI)
dn, info = connection.search_s(config.LDAP_GROUP_BASE_DN, ldap.SCOPE_SUBTREE, f'gidNumber={gid_number}')[0]
return info[config.LDAP_GROUP_ID][0].decode('utf-8')
def check_password(uid, password):
connection = ldap.initialize(config.LDAP_URI)
try:
connection.bind_s(get_user_dn(uid), password)
return True
except ldap.INVALID_CREDENTIALS:
return False
def get_user(uid):
connection = ldap.initialize(config.LDAP_URI)
dn, info = connection.search_s(config.LDAP_USER_BASE_DN, ldap.SCOPE_SUBTREE, get_user_dn(uid, False))[0]
info = {
key: [
val.decode('utf-8') for val in value
] if isinstance(value, list)
else value.decode('utf-8') if isinstance(value, bytes)
else value for key, value in info.items()
}
return User(dn, info)
def user_in_group(uid, gid):
connection = ldap.initialize(config.LDAP_URI)
dn, info = connection.search_s(get_group_dn(gid), ldap.SCOPE_SUBTREE)[0]
return uid.encode('utf-8') in info['memberUid']
class User(UserMixin):
def __init__(self, dn, info):
self.dn = dn
self.info = info
def in_group(self, group_id):
return user_in_group(self.uid, group_id)
def check_password(self, password):
return check_password(self.uid, password)
@property
def cn(self):
return self.info['cn'][0]
@property
def uid(self):
return self.info['uid'][0]
@property
def uid_number(self):
return int(self.info['uidNumber'][0])
@property
def gid_number(self):
return int(self.info['gidNumber'][0])
@property
def is_active(self):
return self.in_group(config.AUTHORIZED_GROUP)
@property
def is_anonymous(self):
return False
def get_id(self):
return self.uid
@property
def admin(self):
return self.in_group(config.ADMIN_GROUP)
@property
def id(self):
return self.uid
def to_dict(self):
return self.info
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment