Created September 14, 2012
WSGI middleware that provides SPNEGO authentication using the Win32 extensions
from base64 import b64decode, b64encode
from lru_dict import LruDict
import win32api
import win32security
import sspi, sspicon
__all__ = ['WindowsAuth']
def _get_user_name():
'''Uses the Windows API to retrieve the current user name'''
return win32api.GetUserName()
except win32api.error, details:
# Seeing 'access denied' errors here for non-local users (presumably
# without permission to login locally). Get the fully-qualified
# username, although a side-effect of these permission-denied errors
# is a lack of Python codecs - so printing the Unicode value fails.
# So just return the repr(), and avoid codecs completely.
return repr(win32api.GetUserNameEx(win32api.NameSamCompatible))
def _get_negotiate_handler():
'''Factory to create a SSPI Authentication handler for Negotiate (RFC 4178)'''
return sspi.ServerAuth(
scflags = reduce(lambda x, y: x | y, [
def _get_ntlm_hanlder():
'''Factory to create a SSPI Authentication handler for the NTLM protocol'''
return sspi.ServerAuth('NTLM')
class WindowsAuth:
'''The WSGI Middleware'''
def __init__(self, wrapped, application_name):
# self.auth_handlers
# dict which contains references to the factory functions
# for each authentication type, keyed by the authentication
# type's identifier
self.auth_handlers = {
'Negotiate': _get_negotiate_handler,
'NTLM': _get_ntlm_hanlder
# self.current_sessions
# a timed LRU cache of current sessions keyed by the connection_token
self.current_sessions = LruDict(size = 10000)
# self.application_name
# a string used for Basic authentication
self.application_name = application_name
# self.wrapped
# the wrapped WSGI application that will be invoked if authentication
# is successful
self.wrapped = wrapped
def __call__(self, environ, start_response):
def error():
'''Returns a standard error response'''
start_response('500 Error', [
('content-type', 'text/plain'),
return ['Internal error']
def noauth():
'''Returns a standard response for the inital request for a resource'''
start_response('401 Unauthorized', [
('content-type', 'text/plain'),
('WWW-Authenticate','Basic realm="{0}"'.format(self.application_name))
return ['Unauthorized']
# check that client set the Authorization header, otherwise, we assume this
# is an initial request
if 'HTTP_AUTHORIZATION' not in environ:
return noauth()
# try to parse the Authorization header
type, authstr = environ['HTTP_AUTHORIZATION'].split(' ', 1)
except Exception as e:
print 'An error occurred while parsing the Authorization header:', environ.get('HTTP_AUTHORIZATION', ''), e
return error()
# cache the start_response function as this may
# need to be overridden latter
_start_response = start_response
# check the type of Authorization
if type == 'Negotiate' or type == 'NTLM':
# we're doing SPNEGO, which can require multiple steps (see RFC 4559)
# so we're going to use the connection_token to identify this conversation
connection_token = self._get_connection_token(environ)
# check to see if we have an on-going conversation and either load that
# session or create a new one
if connection_token in self.current_sessions:
handler = self.current_sessions[connection_token]
handler = self.auth_handlers[type]()
# now we call SSPI and attempt to authorize the provided token
continue_token = None
_, continue_token = handler.authorize(b64decode(authstr))
except sspi.error, details:
print "Failed to authorize client:", details
return error()
except Exception as e:
return error()
# check to see if we have a non-empty continue_token which should be sent
# to the client
if continue_token != None and len(continue_token[0].Buffer) > 0:
# stash the on-going conversation
self.current_sessions[self._get_connection_token(environ)] = handler
# the server might have a last step of the handshake to send to the client even
# after the client has finished it's part, so we make a check: if we can create
# a security context using the provided token and we still have a token to send
# to the client, we simply add the WWW-Authenticate header to whatever the final
# response is and call it done
def _start_response(status, headers, exc_info=None):
headers.append(('WWW-Authenticate', ' '.join([type, b64encode(continue_token[0].Buffer)])))
return start_response(status, headers, exc_info)
except Exception as e:
# at this point, there was some sort of error creating a context from the
# token supplied by the client so far, so we deny access and request that
# the client continues the handshae
start_response('401 Unauthorized', [
('content-type', 'text/plain'),
('WWW-Authenticate', ' '.join([type, b64encode(continue_token[0].Buffer)]))
return ['Unauthorized']
# at this point, we've completed the handshake either having no data
# to return to the client or with one last header to return, so we
# remove the current session from the cache
if connection_token in self.current_sessions:
del self.current_sessions[connection_token]
# attempt to impersonate the context for the client to obtain the user id
except Exception as e:
print "Failed to impersonate user:", e
return error()
# once we've gotten here, we have successfully authenticated as the client
# so we set the REMOTE_USER variable for downstream processing
environ['REMOTE_USER'] = _get_user_name()
# finally, we revert the context for the application back to it's default token
# to prevent unexpected issues
elif type == 'Basic':
# Basic authentication is much more straight-forward, we simply extract
# the user name and password from the header
username, password = b64decode(authstr).split(':',1)
# attempt to logon as the user
win32security.LogonUser(username, None, password, win32security.LOGON32_LOGON_NETWORK, win32security.LOGON32_PROVIDER_DEFAULT)
# logon failed, so tell the client to try again
return noauth()
# authentication succeeded, so set the REMOTE_USER variable
environ['REMOTE_USER'] = username
# since we haven't reported an error or requested more information from the
# client, pass the request to the down-stream application
return self.wrapped(environ, _start_response)
def _get_connection_token(self, environ):
if 'REMOTE_HOST' in environ and environ['REMOTE_HOST'] != None and environ['REMOTE_HOST'] != '':
return environ['REMOTE_HOST']
return environ['REMOTE_ADDR']
