Created
September 22, 2017 16:35
-
-
Save adiroiban/cc60a78e0275e07869e3ed1fb0114621 to your computer and use it in GitHub Desktop.
ldaptor LDAP server faking AD UPN BIND
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
An ldaptor LDAP server which can authenticate based on UPN, as AD does. | |
The LDAP entry needs to have the `userPrincipalName` attribute set. | |
""" | |
from __future__ import absolute_import | |
from ldaptor import interfaces | |
from ldaptor.protocols import pureldap | |
from ldaptor.protocols.ldap import distinguishedname, ldaperrors | |
from twisted.internet import defer | |
from ldaptor.protocols.ldap.ldapserver import LDAPServer | |
class ADLDAPServer(LDAPServer): | |
""" | |
An LDAP server which support BIND using UPN similar to AD. | |
""" | |
_loginAttribute = b'userPrincipalName' | |
def handle_LDAPBindRequest(self, request, controls, reply): | |
if request.version != 3: | |
raise ldaperrors.LDAPProtocolError( | |
'Version %u not supported' % request.version) | |
self.checkControls(controls) | |
if request.dn == '': | |
# anonymous bind | |
self.boundUser = None | |
return pureldap.LDAPBindResponse(resultCode=0) | |
root = interfaces.IConnectedLDAPEntry(self.factory) | |
def _gotUPNResult(results): | |
if len(results) != 1: | |
# Not a UNP | |
return distinguishedname.DistinguishedName(request.dn) | |
# We got a single result, so the UPN might exist. | |
return results[0].dn | |
if '@' in request.dn and ',' not in request.dn: | |
# This might be an UPN request. | |
filterText = b'(' + self._loginAttribute + b'=' + request.dn + b')' | |
d = root.search(filterText=filterText) | |
d.addCallback(_gotUPNResult) | |
else: | |
d = defer.succeed(distinguishedname.DistinguishedName(request.dn)) | |
# Once we know the BIND DN, search for the LDAP entry. | |
d.addCallback(lambda dn: root.lookup(dn)) | |
def _noEntry(fail): | |
fail.trap(ldaperrors.LDAPNoSuchObject) | |
return None | |
d.addErrback(_noEntry) | |
def _gotEntry(entry, auth): | |
if entry is None: | |
raise ldaperrors.LDAPInvalidCredentials | |
d = entry.bind(auth) | |
def _cb(entry): | |
self.boundUser = entry | |
msg = pureldap.LDAPBindResponse( | |
resultCode=ldaperrors.Success.resultCode, | |
matchedDN=str(entry.dn)) | |
return msg | |
d.addCallback(_cb) | |
return d | |
d.addCallback(_gotEntry, request.auth) | |
return d |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment