Skip to content

Instantly share code, notes, and snippets.

@dsoprea
Last active June 23, 2018 18:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsoprea/3ae3f2b085dc0cb30294e4dd6eb308df to your computer and use it in GitHub Desktop.
Save dsoprea/3ae3f2b085dc0cb30294e4dd6eb308df to your computer and use it in GitHub Desktop.
LDAP Lookups Against Active Directory with Python
import logging
import ldap
import collections
# Install:
#
# apt: libsasl2-dev
# pip: python-ldap
#
_USER_QUERY = '(&(objectClass=USER)(sAMAccountName={username}))'
_GROUP_QUERY = '(&(objectClass=GROUP)(cn={group_name}))'
_LOGGER = logging.getLogger(__name__)
_USER = \
collections.namedtuple(
'_USER', [
'dn',
'attributes',
'groups',
])
class NotFoundException(Exception):
pass
class LdapAdapter(object):
def __init__(self, host_and_port, username, password, base_dn):
self.__host_and_port = host_and_port
self.__username = username
self.__password = password
self.__base_dn = base_dn
self.__raw_resource = None
@property
def __resource(self):
if self.__raw_resource is None:
self.__raw_resource = self.__auth()
return self.__raw_resource
def __auth(self):
conn = ldap.initialize('ldap://' + self.__host_and_port)
conn.protocol_version = 3
conn.set_option(ldap.OPT_REFERRALS, 0)
try:
conn.simple_bind_s(self.__username, self.__password)
except ldap.INVALID_CREDENTIALS:
# Pinned, for the future.
raise
except ldap.SERVER_DOWN:
# Pinned, for the future.
raise
except ldap.LDAPError:
_LOGGER.error("LDAP error content:\n{}".format(e.message))
if issubclass(e.message.__class__, dict) is True and \
'desc' in e.message:
raise Exception("LDAP: {}".format(e.message['desc']))
raise
else:
return conn
def get_dn_by_username(self, username):
"""Return user information. See _USER."""
uf = _USER_QUERY.format(username=username)
results_raw = \
self.__resource.search_s(
self.__base_dn,
ldap.SCOPE_SUBTREE,
uf)
if not results_raw:
raise NotFoundException(username)
results = []
for dn, attributes in results_raw:
if dn is None:
continue
u = _USER(
dn=dn,
attributes=attributes,
groups=attributes['memberOf'])
results.append(u)
assert \
len(results) == 1, \
"More than one result was found for user [{}], which doesn't " \
"make sense: {}".format(username, results)
return results[0]
def get_group_members(self, group_name):
"""Return a list of DNs."""
gf = _GROUP_QUERY.format(group_name=group_name)
results_raw = \
self.__resource.search_s(
self.__base_dn,
ldap.SCOPE_SUBTREE,
gf)
if not results_raw:
raise NotFoundException(group_name)
collections = []
for dn, attributes in results_raw:
if dn is None:
continue
collections.append(attributes['member'])
if not collections:
raise NotFoundException(group_name)
assert \
len(collections) == 1, \
"Too many sets of results returned: {}".format(collections)
return collections[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment