Create a gist now

Instantly share code, notes, and snippets.

Run hgwebdir with LDAP authentication
#!/usr/bin/env python
"""Run hgwebdir with LDAP authentication
Configuration::
[web]
# REQUIRED:
authldapuri = ldap://example.org/
authldapbasedn = ou=People,dc=example,dc=org
# OPTIONAL:
authrealm = Mercurial Repository
authldaptimeout = 5
authldapcachettl = 600
"""
import hashlib, ldap, time
from werkzeug import http
#from mercurial import demandimport; demandimport.enable()
from mercurial import hgweb
hgapp = hgweb.hgwebdir('hgweb.config')
_authcache = {} # credhash: validthru
def check_auth(username, password):
credhash = hashlib.sha1('%s:%s' % (username, password)).digest()
validthru = _authcache.get(credhash, 0)
if validthru >= time.time():
return True
ldapuri = hgapp.ui.config('web', 'authldapuri')
ldapbasedn = hgapp.ui.config('web', 'authldapbasedn')
if not ldapuri or not ldapbasedn:
hgapp.ui.warn('ldap: url or basedn not configured\n')
return False
conn = ldap.initialize(ldapuri)
conn.set_option(ldap.OPT_NETWORK_TIMEOUT,
hgapp.ui.configint('web', 'authldaptimeout', 5))
try:
conn.simple_bind_s('uid=%s,%s' % (username, ldapbasedn), password)
cachettl = hgapp.ui.configint('web', 'authldapcachettl', 600)
_authcache[credhash] = time.time() + cachettl
return True
except ldap.INVALID_CREDENTIALS:
return False
except ldap.LDAPError, err:
hgapp.ui.warn('ldap: %s\n' % err)
return False # error 500 instead?
finally:
conn.unbind()
# https://wsgi.readthedocs.org/en/latest/specifications/simple_authentication.html
def application(environ, start_response):
auth = http.parse_authorization_header(environ.pop('HTTP_AUTHORIZATION', None))
if auth and check_auth(auth.username, auth.password):
environ['REMOTE_USER'] = auth.username
def wrapped_start_response(status, headers, exc_info=None):
if status.startswith('401'):
realm = hgapp.ui.config('web', 'authrealm', 'Mercurial Repository')
headers.append(('WWW-Authenticate', 'Basic realm="%s"' % realm))
return start_response(status, headers, exc_info)
return hgapp(environ, wrapped_start_response)
if __name__ == '__main__':
from werkzeug import serving
serving.run_simple('localhost', 8000, application,
use_debugger=True, use_reloader=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment