Skip to content

Instantly share code, notes, and snippets.

@ig0774
Created March 25, 2011 19:52
Show Gist options
  • Save ig0774/887519 to your computer and use it in GitHub Desktop.
Save ig0774/887519 to your computer and use it in GitHub Desktop.
WSGI middleware that provides SPNEGO authentication using the Win32 extensions
from base64 import b64decode, b64encode
from lru_dict import LruDict
import win32api
import win32security
import sspi, sspicon
__all__ = ['WindowsAuth']
def _get_user_name():
'''Uses the Windows API to retrieve the current user name'''
try:
return win32api.GetUserName()
except win32api.error, details:
# Seeing 'access denied' errors here for non-local users (presumably
# without permission to login locally). Get the fully-qualified
# username, although a side-effect of these permission-denied errors
# is a lack of Python codecs - so printing the Unicode value fails.
# So just return the repr(), and avoid codecs completely.
return repr(win32api.GetUserNameEx(win32api.NameSamCompatible))
def _get_negotiate_handler():
'''Factory to create a SSPI Authentication handler for Negotiate (RFC 4178)'''
return sspi.ServerAuth(
'Negotiate',
scflags = reduce(lambda x, y: x | y, [
sspicon.ASC_REQ_INTEGRITY,
sspicon.ASC_REQ_SEQUENCE_DETECT,
sspicon.ASC_REQ_REPLAY_DETECT,
sspicon.ASC_REQ_DELEGATE,
sspicon.ASC_REQ_CONFIDENTIALITY,
sspicon.ASC_REQ_STREAM,
sspicon.ASC_REQ_USE_SESSION_KEY
])
)
def _get_ntlm_hanlder():
'''Factory to create a SSPI Authentication handler for the NTLM protocol'''
return sspi.ServerAuth('NTLM')
class WindowsAuth:
'''The WSGI Middleware'''
def __init__(self, wrapped, application_name):
# self.auth_handlers
# dict which contains references to the factory functions
# for each authentication type, keyed by the authentication
# type's identifier
self.auth_handlers = {
'Negotiate': _get_negotiate_handler,
'NTLM': _get_ntlm_hanlder
}
# self.current_sessions
# a timed LRU cache of current sessions keyed by the connection_token
self.current_sessions = LruDict(size = 10000)
# self.application_name
# a string used for Basic authentication
self.application_name = application_name
# self.wrapped
# the wrapped WSGI application that will be invoked if authentication
# is successful
self.wrapped = wrapped
def __call__(self, environ, start_response):
def error():
'''Returns a standard error response'''
start_response('500 Error', [
('content-type', 'text/plain'),
])
return ['Internal error']
def noauth():
'''Returns a standard response for the inital request for a resource'''
start_response('401 Unauthorized', [
('content-type', 'text/plain'),
('WWW-Authenticate','Negotiate'),
('WWW-Authenticate','NTLM'),
('WWW-Authenticate','Basic realm="{0}"'.format(self.application_name))
])
return ['Unauthorized']
# check that client set the Authorization header, otherwise, we assume this
# is an initial request
if 'HTTP_AUTHORIZATION' not in environ:
return noauth()
# try to parse the Authorization header
try:
type, authstr = environ['HTTP_AUTHORIZATION'].split(' ', 1)
except Exception as e:
print 'An error occurred while parsing the Authorization header:', environ.get('HTTP_AUTHORIZATION', ''), e
return error()
# cache the start_response function as this may
# need to be overridden latter
_start_response = start_response
# check the type of Authorization
if type == 'Negotiate' or type == 'NTLM':
# we're doing SPNEGO, which can require multiple steps (see RFC 4559)
# so we're going to use the connection_token to identify this conversation
connection_token = self._get_connection_token(environ)
# check to see if we have an on-going conversation and either load that
# session or create a new one
if connection_token in self.current_sessions:
handler = self.current_sessions[connection_token]
else:
handler = self.auth_handlers[type]()
handler.reset()
# now we call SSPI and attempt to authorize the provided token
continue_token = None
try:
_, continue_token = handler.authorize(b64decode(authstr))
except sspi.error, details:
print "Failed to authorize client:", details
return error()
except Exception as e:
return error()
# check to see if we have a non-empty continue_token which should be sent
# to the client
if continue_token != None and len(continue_token[0].Buffer) > 0:
# stash the on-going conversation
self.current_sessions[self._get_connection_token(environ)] = handler
# the server might have a last step of the handshake to send to the client even
# after the client has finished it's part, so we make a check: if we can create
# a security context using the provided token and we still have a token to send
# to the client, we simply add the WWW-Authenticate header to whatever the final
# response is and call it done
try:
handler.ctxt.ImpersonateSecurityContext()
def _start_response(status, headers, exc_info=None):
headers.append(('WWW-Authenticate', ' '.join([type, b64encode(continue_token[0].Buffer)])))
return start_response(status, headers, exc_info)
except Exception as e:
# at this point, there was some sort of error creating a context from the
# token supplied by the client so far, so we deny access and request that
# the client continues the handshae
start_response('401 Unauthorized', [
('content-type', 'text/plain'),
('WWW-Authenticate', ' '.join([type, b64encode(continue_token[0].Buffer)]))
])
return ['Unauthorized']
# at this point, we've completed the handshake either having no data
# to return to the client or with one last header to return, so we
# remove the current session from the cache
if connection_token in self.current_sessions:
del self.current_sessions[connection_token]
# attempt to impersonate the context for the client to obtain the user id
try:
handler.ctxt.ImpersonateSecurityContext()
except Exception as e:
print "Failed to impersonate user:", e
return error()
# once we've gotten here, we have successfully authenticated as the client
# so we set the REMOTE_USER variable for downstream processing
environ['REMOTE_USER'] = _get_user_name()
# finally, we revert the context for the application back to it's default token
# to prevent unexpected issues
try:
handler.ctxt.RevertSecurityContext()
except:
pass
elif type == 'Basic':
# Basic authentication is much more straight-forward, we simply extract
# the user name and password from the header
username, password = b64decode(authstr).split(':',1)
try:
# attempt to logon as the user
win32security.LogonUser(username, None, password, win32security.LOGON32_LOGON_NETWORK, win32security.LOGON32_PROVIDER_DEFAULT)
except:
# logon failed, so tell the client to try again
return noauth()
# authentication succeeded, so set the REMOTE_USER variable
environ['REMOTE_USER'] = username
# since we haven't reported an error or requested more information from the
# client, pass the request to the down-stream application
return self.wrapped(environ, _start_response)
def _get_connection_token(self, environ):
if 'REMOTE_HOST' in environ and environ['REMOTE_HOST'] != None and environ['REMOTE_HOST'] != '':
return environ['REMOTE_HOST']
else:
return environ['REMOTE_ADDR']
@ig0774
Copy link
Author

ig0774 commented Mar 25, 2011

This is basically a reworking of this recipe, borrowing some ideas from the excellent Waffle project. The current_sessions variable can be initialized to any caching solution you like, though I went with Michael Palmer's LruDict as it also adds time limits to the cache.

@ig0774
Copy link
Author

ig0774 commented Apr 25, 2011

For some reason, certain versions of IE do not seem to support Negotiate with sspicon.ASC_REQ_REPLAY_DETECT set, though I believe this may be an artifact running this through WSGI which always sets the Connection header to 'close'.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment