-
-
Save ipimpat/5b9614db4c1e4794cc5d50a347e4a8ef to your computer and use it in GitHub Desktop.
CKAN AuthGuardMiddleware
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from ckan.common import config, session | |
import ckan.plugins as plugins | |
import ckan.model as model | |
import ckan.lib.base as base | |
import ckan.lib.api_token as api_token | |
log = logging.getLogger('ckan.logic') | |
UNGUARDED_PATHS = ['/favicon.ico', '/error/document', '/user/login', '/ldap_login_handler', '/user/logged_in', '/user/logged_out', '/user/logged_out_redirect', '/util/redirect'] | |
class AuthGuardMiddleware(object): | |
def __init__(self, app, app_conf): | |
self.app = app | |
def __call__(self, environ, start_response): | |
# We allow access if the requested path is not | |
# guarded but otherwise the client has be authorized | |
if not self._is_guarded(environ) or self._is_authorized(environ): | |
return self.app(environ, start_response) | |
# If we reach this point, it means that the client requested | |
# a guarded path, but was not authorized to access it, so we need | |
# to return an appropriate response based on the requested path. | |
if self._is_api_request(environ): | |
return self._response_unauthorized(environ, start_response) | |
else: | |
return self._redirect_login_form(environ, start_response) | |
def _redirect_login_form(self, environ, start_response): | |
# Redirect the client to the login page | |
start_response('307 Temporary Redirect', [ | |
('Location', self._get_login_url(environ)), | |
('Content-Length','0'), | |
('X-Robots-Tag', 'noindex, nofollow, noarchive') | |
]) | |
return [''] | |
def _response_unauthorized(self, environ, start_response): | |
# Send 400 Bad Request response | |
start_response('400 Bad Request', [('Content-type', 'application/json')]) | |
return ['{"success": false, "error": "invalid authorization token"}'] | |
def _is_guarded(self, environ): | |
# We allow annomouys access to asset files, language api and unguarded paths | |
if (environ['PATH_INFO'].startswith('/webassets/') | |
or environ['PATH_INFO'].startswith('/base/') | |
or environ['PATH_INFO'].startswith('/api/i18n/') | |
or environ['PATH_INFO'] in self._get_unguarded_paths()): | |
return False | |
return True | |
def _get_unguarded_paths(self): | |
# Get the value of the ckan.auth_guard.unguarded_paths setting | |
# from the CKAN config file or the default list defined in UNGUARDED_PATHS | |
# if the setting isn't in the config file. | |
return plugins.toolkit.aslist(config.get('ckan.auth_guard.unguarded_paths', UNGUARDED_PATHS)) | |
def _is_authorized(self, environ): | |
if self._is_logged_in(environ): | |
return True | |
elif self._is_api_request(environ) and self._has_valid_api_key(environ): | |
return True | |
return False | |
def _is_logged_in(self, environ): | |
# check if the request belongs to an authenticated session | |
if 'ckanext-ldap-user' in session: | |
return True | |
elif 'repoze.who.identity' in environ: | |
return True | |
def _is_api_request(self, environ): | |
# check the requested path starts with /api/ | |
return environ['PATH_INFO'].startswith('/api/') | |
def _get_client_api_key(self, environ): | |
# First look for a key in the user defined API key header | |
# look for HTTP_X_CKAN_API_KEY or HTTP_AUTHORIZATION | |
api_key = environ.get(self._get_api_key_header_name(), '') | |
# Then look for a key in the Authorization header | |
if not api_key: | |
api_key = environ.get('HTTP_AUTHORIZATION', '') | |
# but if the header contains whitespaces, we ignore it | |
if ' ' in api_key: | |
api_key = '' | |
if not api_key: | |
return None | |
return unicode(api_key) | |
def _get_api_key_header_name(self): | |
# Get the configured CKAN API key header name | |
header = config.get(base.APIKEY_HEADER_NAME_KEY, base.APIKEY_HEADER_NAME_DEFAULT) | |
return 'HTTP_' + header.replace('-', '_').upper() | |
def _has_valid_api_key(self, environ): | |
# Get client API key | |
api_key = self._get_client_api_key(environ) | |
if api_key is not None: | |
if api_token.get_user_from_token(api_key): | |
return True | |
query = model.Session.query(model.User) | |
if query.filter_by(apikey=api_key).count() > 0: | |
return True | |
return False | |
def _get_login_url(self, environ): | |
url = environ.get('HTTP_X_FORWARDED_PROTO') or environ.get('wsgi.url_scheme', 'http') | |
url += '://' | |
if environ.get('HTTP_HOST'): | |
url += environ['HTTP_HOST'] | |
else: | |
url += environ['SERVER_NAME'] | |
url += '/user/login' | |
if environ['PATH_INFO'] != '' and environ['PATH_INFO'] != '/': | |
url += '?came_from=' + environ['PATH_INFO'] | |
return url | |
class AuthGuardPlugin(plugins.SingletonPlugin): | |
plugins.implements(plugins.IMiddleware, inherit=True) | |
def make_middleware(self, app, config): | |
return AuthGuardMiddleware(app, config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment