Skip to content

Instantly share code, notes, and snippets.

@kwharrigan
Forked from bollwyvl/authen_msad.py
Created July 27, 2012 20:41
Show Gist options
  • Save kwharrigan/3190379 to your computer and use it in GitHub Desktop.
Save kwharrigan/3190379 to your computer and use it in GitHub Desktop.
A Python port of Perl's Apache::AuthenMSAD
"""
authen_msad
A port of Perl's Apache::AuthenMSAD
Takes advantage of Microsoft Active Directory allowing a user to be verified
with 'user@domain' instead of searching for the distinguished name.
To merge as seamlessly as possible with existing systems (i.e. SharePoint,
etc.) munge the incoming "domain\user" into "user@domain".
Implementations for both mod_wsgi and the deprecated mod_python are included.
Authors:
bollwyvl - Initial implementation
kwharrigan - Added TLS
Some caching code (modified to use shelve) due to Reed L. O'Brien
"""
from os import urandom
import ldap
import shelve
import hashlib
import time
CACHE_LOCATION = 'snip'
CACHE_TIMEOUT = 60
cache = shelve.open(CACHE_LOCATION, writeback=True)
def debug(env, string):
print >> env['wsgi.errors'], string
def check_password(environ, user, password):
"""
This is the handler used by mod_wsgi.
Sample httpd.conf entry:
<Location /directory.to.protect/>
AuthType Basic
AuthName "This Content is Protected"
Require valid-user
AuthBasicProvider wsgi
WSGIAuthUserScript /path/to/authen_msad.py
SetEnv authen_msad.ldap_url ldap://<server ip address>:1234/
</Location>
"""
try:
# Setenv doesn't do what we want, so we have to hard code ths in
domain, user = user.split("\\")
con_options = {}
ldap_url = 'ldap://snip'
use_tls = True
cacertfile = '/etc/pki/tls/certs/snip.cer'
newctx = '0'
tls_demand = '1'
if cacertfile:
con_options[ldap.OPT_X_TLS_CACERTFILE] = cacertfile
if newctx:
con_options[ldap.OPT_X_TLS_NEWCTX] = int(newctx)
if tls_demand:
con_options[ldap.OPT_X_TLS_DEMAND] = int(tls_demand)
debug(environ,'Checking cache')
if checkCache(environ, user, password):
return True
except:
return None
debug(environ, 'Checking MSAD')
return msad_check_user(user=user,
domain=domain,
password=password,
ldap_url=ldap_url,
use_tls=use_tls,
con_options=con_options,
environ=environ,
errors={
ldap.INVALID_CREDENTIALS: None,
ldap.SERVER_DOWN: None,
ldap.CONNECT_ERROR: None
})
def msad_check_user(user, domain, password, ldap_url, con_options={}, use_tls=True, environ={}, errors={}):
"""
The bit that actually talks to the ActiveDirectory server.
"""
try:
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 255)
# http://helpful.knobs-dials.com/index.php/Python-ldap_noteS
ldap.set_option(ldap.OPT_REFERRALS, 0)
ldap_client = ldap.initialize(ldap_url)
ldap_client.protocol_version = ldap.VERSION3
for key, item in con_options.items():
debug(environ, 'Setting %d to %s' % (key, item))
ldap_client.set_option(int(key), item)
if use_tls:
debug(environ, 'Enabling TLS')
error_code = ldap_client.start_tls_s()
debug(environ, 'TLS Enabled')
debug(environ, 'Binding')
ldap_client.simple_bind_s("%s@%s" % (user, domain), password)
debug(environ, 'Done binding')
addToCache(environ, user, password)
return True
except ldap.INVALID_CREDENTIALS:
print 'Invalid credentials'
return errors.get(ldap.INVALID_CREDENTIALS, None)
except ldap.SERVER_DOWN:
print 'Server down'
return errors.get(ldap.SERVER_DOWN, None)
except ldap.CONNECT_ERROR, e:
debug(environ, 'Connect error %s' % str(e))
return errors.get(ldap.CONNECT_ERROR, None)
def checkCache(environ, user, password):
try:
try:
ce = cache[user]
except Exception, e:
debug(environ, "error accessing cache %s" % str(e))
return False
debug(environ, "checkCache: found %s in cache" % user)
assert ce.checkExpired() is False
debug(environ, "checkCache: checking pw %s" % user)
good_pw = ce.checkPassword(password)
debug(environ, "checkCache: done checking pw %s" % user)
debug(environ, "%s good password?: %s" % (user, good_pw))
if good_pw:
ce.touch()
cache.sync()
return True
else:
return False
except AssertionError:
del cache[user]
cache.sync()
debug(environ,"Expired %s from cache" % user)
return False
except KeyError:
debug(environ, "%s not in cache" % user)
return False
except:
raise
def addToCache(environ, user, password):
try:
ce = CacheEntry(password)
cache[user] = ce
cache.sync()
debug(environ, "Added %s to cache" % user)
except:
debug(environ, "n error occured in auth.wsgi:addToCache")
raise
finally:
cache.close()
class CacheEntry(object):
SALT_SIZE = 256
HASH_FUNC = hashlib.sha512
def __init__(self, password):
self.salt = urandom(self.SALT_SIZE)
self.hash = self.HASH_FUNC(password + self.salt).hexdigest()
self.last_access = time.time()
def touch(self):
self.last_access = time.time()
def checkExpired(self):
age = time.time()-self.last_access
if (age) > CACHE_TIMEOUT:
return True
else:
return False
def checkPassword(self, password):
if self.HASH_FUNC(password + self.salt).hexdigest() == self.hash:
return True
else:
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment