Created
August 2, 2021 21:02
-
-
Save dmgolembiowski/8508e3a8b0ad98c2b7097956e3acfbe5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class GroupMirrorLDAP(BaseBackend): | |
""" | |
This is a hack to retrieve user groups for a user without having | |
""" | |
# --------------------------------------------------------- | |
def get_full_cn(self, cwid): | |
""" | |
use the control user to get get complete CN for django user | |
""" | |
cc = ldap.initialize(settings.AUTH_LDAP_SERVER_URI) | |
cc.simple_bind_s(settings.AUTH_LDAP_BIND_DN, settings.AUTH_LDAP_BIND_PASSWORD) | |
result = cc.search_s("o=auth", ldap.SCOPE_SUBTREE, f"(uid={cwid})") | |
return result[0][0] | |
# --------------------------------------------------------- | |
def get_my_groups(self, username, password): | |
"""""" | |
cn = self.get_full_cn(username) | |
cu = ldap.initialize(settings.AUTH_LDAP_SERVER_URI) | |
cu.simple_bind_s(cn, password) | |
result = cu.search_s("o=auth", ldap.SCOPE_SUBTREE, f"(uid={username})") | |
payload = result[0][1]["groupMembership"] | |
grouplist = [x.decode("UTF-8") for x in payload] | |
# clean up the groups | |
cn_list = [x.split(",")[0] for x in grouplist] | |
group_namelist = sorted([x.split("=")[1] for x in cn_list]) | |
return group_namelist | |
# --------------------------------------------------------- | |
def update_my_groups(self, user, ldap_grouplist): | |
""" | |
add missing LDAP groups to user | |
""" | |
existing_groups = list(Group.objects.all().iterator()) | |
existing_group_names = frozenset(group.name for group in existing_groups) | |
_ = [ # noqa: F841 | |
Group.objects.get_or_create(name=name)[0] | |
for name in ldap_grouplist | |
if name not in existing_group_names | |
] | |
my_groups = Group.objects.filter(name__in=ldap_grouplist) | |
user.groups.set(my_groups) | |
# --------------------------------------------------------- | |
def authenticate(self, request, username=None, password=None, **kwargs): | |
remote_auth = LDAPBackend() | |
ldapuser = remote_auth.authenticate(request, username, password) | |
if not ldapuser: | |
return None | |
try: | |
grouplist = self.get_my_groups(username, password) | |
self.update_my_groups(ldapuser, grouplist) | |
except Exception: | |
if settings.VARREP_ENVIRONMENT != "prod": | |
raise | |
return ldapuser | |
# --------------------------------------------------------- | |
def get_user(self, user_id): | |
""" | |
required for django auth | |
""" | |
user = User.objects.get(pk=user_id) | |
return user |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment