-
-
Save kwharrigan/3190379 to your computer and use it in GitHub Desktop.
A Python port of Perl's Apache::AuthenMSAD
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
authen_msad | |
A port of Perl's Apache::AuthenMSAD | |
Takes advantage of Microsoft Active Directory allowing a user to be verified | |
with 'user@domain' instead of searching for the distinguished name. | |
To merge as seamlessly as possible with existing systems (i.e. SharePoint, | |
etc.) munge the incoming "domain\user" into "user@domain". | |
Implementations for both mod_wsgi and the deprecated mod_python are included. | |
Authors: | |
bollwyvl - Initial implementation | |
kwharrigan - Added TLS | |
Some caching code (modified to use shelve) due to Reed L. O'Brien | |
""" | |
from os import urandom | |
import ldap | |
import shelve | |
import hashlib | |
import time | |
CACHE_LOCATION = 'snip' | |
CACHE_TIMEOUT = 60 | |
cache = shelve.open(CACHE_LOCATION, writeback=True) | |
def debug(env, string): | |
print >> env['wsgi.errors'], string | |
def check_password(environ, user, password): | |
""" | |
This is the handler used by mod_wsgi. | |
Sample httpd.conf entry: | |
<Location /directory.to.protect/> | |
AuthType Basic | |
AuthName "This Content is Protected" | |
Require valid-user | |
AuthBasicProvider wsgi | |
WSGIAuthUserScript /path/to/authen_msad.py | |
SetEnv authen_msad.ldap_url ldap://<server ip address>:1234/ | |
</Location> | |
""" | |
try: | |
# Setenv doesn't do what we want, so we have to hard code ths in | |
domain, user = user.split("\\") | |
con_options = {} | |
ldap_url = 'ldap://snip' | |
use_tls = True | |
cacertfile = '/etc/pki/tls/certs/snip.cer' | |
newctx = '0' | |
tls_demand = '1' | |
if cacertfile: | |
con_options[ldap.OPT_X_TLS_CACERTFILE] = cacertfile | |
if newctx: | |
con_options[ldap.OPT_X_TLS_NEWCTX] = int(newctx) | |
if tls_demand: | |
con_options[ldap.OPT_X_TLS_DEMAND] = int(tls_demand) | |
debug(environ,'Checking cache') | |
if checkCache(environ, user, password): | |
return True | |
except: | |
return None | |
debug(environ, 'Checking MSAD') | |
return msad_check_user(user=user, | |
domain=domain, | |
password=password, | |
ldap_url=ldap_url, | |
use_tls=use_tls, | |
con_options=con_options, | |
environ=environ, | |
errors={ | |
ldap.INVALID_CREDENTIALS: None, | |
ldap.SERVER_DOWN: None, | |
ldap.CONNECT_ERROR: None | |
}) | |
def msad_check_user(user, domain, password, ldap_url, con_options={}, use_tls=True, environ={}, errors={}): | |
""" | |
The bit that actually talks to the ActiveDirectory server. | |
""" | |
try: | |
ldap.set_option(ldap.OPT_DEBUG_LEVEL, 255) | |
# http://helpful.knobs-dials.com/index.php/Python-ldap_noteS | |
ldap.set_option(ldap.OPT_REFERRALS, 0) | |
ldap_client = ldap.initialize(ldap_url) | |
ldap_client.protocol_version = ldap.VERSION3 | |
for key, item in con_options.items(): | |
debug(environ, 'Setting %d to %s' % (key, item)) | |
ldap_client.set_option(int(key), item) | |
if use_tls: | |
debug(environ, 'Enabling TLS') | |
error_code = ldap_client.start_tls_s() | |
debug(environ, 'TLS Enabled') | |
debug(environ, 'Binding') | |
ldap_client.simple_bind_s("%s@%s" % (user, domain), password) | |
debug(environ, 'Done binding') | |
addToCache(environ, user, password) | |
return True | |
except ldap.INVALID_CREDENTIALS: | |
print 'Invalid credentials' | |
return errors.get(ldap.INVALID_CREDENTIALS, None) | |
except ldap.SERVER_DOWN: | |
print 'Server down' | |
return errors.get(ldap.SERVER_DOWN, None) | |
except ldap.CONNECT_ERROR, e: | |
debug(environ, 'Connect error %s' % str(e)) | |
return errors.get(ldap.CONNECT_ERROR, None) | |
def checkCache(environ, user, password): | |
try: | |
try: | |
ce = cache[user] | |
except Exception, e: | |
debug(environ, "error accessing cache %s" % str(e)) | |
return False | |
debug(environ, "checkCache: found %s in cache" % user) | |
assert ce.checkExpired() is False | |
debug(environ, "checkCache: checking pw %s" % user) | |
good_pw = ce.checkPassword(password) | |
debug(environ, "checkCache: done checking pw %s" % user) | |
debug(environ, "%s good password?: %s" % (user, good_pw)) | |
if good_pw: | |
ce.touch() | |
cache.sync() | |
return True | |
else: | |
return False | |
except AssertionError: | |
del cache[user] | |
cache.sync() | |
debug(environ,"Expired %s from cache" % user) | |
return False | |
except KeyError: | |
debug(environ, "%s not in cache" % user) | |
return False | |
except: | |
raise | |
def addToCache(environ, user, password): | |
try: | |
ce = CacheEntry(password) | |
cache[user] = ce | |
cache.sync() | |
debug(environ, "Added %s to cache" % user) | |
except: | |
debug(environ, "n error occured in auth.wsgi:addToCache") | |
raise | |
finally: | |
cache.close() | |
class CacheEntry(object): | |
SALT_SIZE = 256 | |
HASH_FUNC = hashlib.sha512 | |
def __init__(self, password): | |
self.salt = urandom(self.SALT_SIZE) | |
self.hash = self.HASH_FUNC(password + self.salt).hexdigest() | |
self.last_access = time.time() | |
def touch(self): | |
self.last_access = time.time() | |
def checkExpired(self): | |
age = time.time()-self.last_access | |
if (age) > CACHE_TIMEOUT: | |
return True | |
else: | |
return False | |
def checkPassword(self, password): | |
if self.HASH_FUNC(password + self.salt).hexdigest() == self.hash: | |
return True | |
else: | |
return False | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment