Created
June 30, 2020 19:15
-
-
Save dpfoose/b4e23cf17533fc41ec40cb50041ee89c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
ADMIN_GROUP = os.environ['ADMIN_GROUP'] | |
AUTHORIZED_GROUP = os.environ['AUTHORIZED_GROUP'] | |
# LDAP connection settings | |
LDAP_URI = os.environ['LDAP_URI'] | |
LDAP_BASE_DN = os.environ['LDAP_BASE_DN'] | |
LDAP_USER_BASE_DN = os.environ['LDAP_USER_BASE_DN'] | |
LDAP_GROUP_BASE_DN = os.environ['LDAP_GROUP_BASE_DN'] | |
LDAP_USER_ID = os.environ['LDAP_USER_ID'] | |
LDAP_GROUP_ID = os.environ['LDAP_GROUP_ID'] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask_login import LoginManager | |
import datetime | |
from .ldap_user_model import get_user | |
import jwt | |
import ldap_flask_login_config as config | |
class LoginError(Exception): | |
def __init__(self, message, redirect_url=None): | |
super(LoginError, self).__init__(message) | |
self.redirect_url = redirect_url | |
login_manager = LoginManager() | |
login_manager.login_view = 'browser.browser_login' | |
login_manager.blueprint_login_views = { | |
'api': 'api.unauthorized' | |
} | |
@login_manager.user_loader | |
def user_loader(user_id): | |
user = get_user(user_id) | |
if user.in_group(config.AUTHORIZED_GROUP): | |
return user | |
return | |
@login_manager.request_loader | |
def request_loader(req): | |
if 'Authorization' in req.headers: | |
auth_header = req.headers.get('Authorization') | |
token = auth_header.split(' ')[1] | |
user_data = jwt.decode(token, config.SECRET, algorithms='HS256') | |
print(user_data) | |
user = get_user(user_data['uid'][0]) | |
print(user.to_dict()) | |
return user | |
def get_jwt(username, password): | |
user = get_user(username) | |
if user.check_password(password): | |
user_data = {'uid': user.uid, 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)} | |
return jwt.encode(user_data, config.SECRET, algorithm='HS256').decode('utf-8') | |
raise LoginError('Incorrect username/password.') | |
def authenticate_user(req): | |
print('authenticate_user') | |
username = req.form.get('username') | |
password = req.form.get('password') | |
if username is None and password is None: | |
print('loading json') | |
req_json = req.get_json(force=True) | |
username = req_json['username'] | |
password = req_json['password'] | |
user = get_user(username) | |
if user.check_password(password): | |
return user | |
raise LoginError('Incorrect username/password.') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ldap | |
import ldap_flask_login_config as config | |
def get_user_dn(uid, full=True): | |
dn = f'{config.LDAP_USER_ID}={uid}' | |
if full: | |
dn += f',{config.LDAP_USER_BASE_DN}' | |
return dn | |
def get_group_dn(gid, full=True): | |
dn = f'{config.LDAP_GROUP_ID}={gid}' | |
if full: | |
dn += f',{config.LDAP_GROUP_BASE_DN}' | |
return dn | |
def resolve_gid_number(gid_number): | |
connection = ldap.initialize(config.LDAP_URI) | |
dn, info = connection.search_s(config.LDAP_GROUP_BASE_DN, ldap.SCOPE_SUBTREE, f'gidNumber={gid_number}')[0] | |
return info[config.LDAP_GROUP_ID][0].decode('utf-8') | |
def check_password(uid, password): | |
connection = ldap.initialize(config.LDAP_URI) | |
try: | |
connection.bind_s(get_user_dn(uid), password) | |
return True | |
except ldap.INVALID_CREDENTIALS: | |
return False | |
def get_user(uid): | |
connection = ldap.initialize(config.LDAP_URI) | |
dn, info = connection.search_s(config.LDAP_USER_BASE_DN, ldap.SCOPE_SUBTREE, get_user_dn(uid, False))[0] | |
info = { | |
key: [ | |
val.decode('utf-8') for val in value | |
] if isinstance(value, list) | |
else value.decode('utf-8') if isinstance(value, bytes) | |
else value for key, value in info.items() | |
} | |
return User(dn, info) | |
def user_in_group(uid, gid): | |
connection = ldap.initialize(config.LDAP_URI) | |
dn, info = connection.search_s(get_group_dn(gid), ldap.SCOPE_SUBTREE)[0] | |
return uid.encode('utf-8') in info['memberUid'] | |
class User(UserMixin): | |
def __init__(self, dn, info): | |
self.dn = dn | |
self.info = info | |
def in_group(self, group_id): | |
return user_in_group(self.uid, group_id) | |
def check_password(self, password): | |
return check_password(self.uid, password) | |
@property | |
def cn(self): | |
return self.info['cn'][0] | |
@property | |
def uid(self): | |
return self.info['uid'][0] | |
@property | |
def uid_number(self): | |
return int(self.info['uidNumber'][0]) | |
@property | |
def gid_number(self): | |
return int(self.info['gidNumber'][0]) | |
@property | |
def is_active(self): | |
return self.in_group(config.AUTHORIZED_GROUP) | |
@property | |
def is_anonymous(self): | |
return False | |
def get_id(self): | |
return self.uid | |
@property | |
def admin(self): | |
return self.in_group(config.ADMIN_GROUP) | |
@property | |
def id(self): | |
return self.uid | |
def to_dict(self): | |
return self.info |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment