Skip to content

Instantly share code, notes, and snippets.

@ipimpat
Last active November 5, 2021 00:15
Show Gist options
  • Save ipimpat/5b9614db4c1e4794cc5d50a347e4a8ef to your computer and use it in GitHub Desktop.
Save ipimpat/5b9614db4c1e4794cc5d50a347e4a8ef to your computer and use it in GitHub Desktop.
CKAN AuthGuardMiddleware
import logging
from ckan.common import config, session
import ckan.plugins as plugins
import ckan.model as model
import ckan.lib.base as base
import ckan.lib.api_token as api_token
log = logging.getLogger('ckan.logic')
UNGUARDED_PATHS = ['/favicon.ico', '/error/document', '/user/login', '/ldap_login_handler', '/user/logged_in', '/user/logged_out', '/user/logged_out_redirect', '/util/redirect']
class AuthGuardMiddleware(object):
def __init__(self, app, app_conf):
self.app = app
def __call__(self, environ, start_response):
# We allow access if the requested path is not
# guarded but otherwise the client has be authorized
if not self._is_guarded(environ) or self._is_authorized(environ):
return self.app(environ, start_response)
# If we reach this point, it means that the client requested
# a guarded path, but was not authorized to access it, so we need
# to return an appropriate response based on the requested path.
if self._is_api_request(environ):
return self._response_unauthorized(environ, start_response)
else:
return self._redirect_login_form(environ, start_response)
def _redirect_login_form(self, environ, start_response):
# Redirect the client to the login page
start_response('307 Temporary Redirect', [
('Location', self._get_login_url(environ)),
('Content-Length','0'),
('X-Robots-Tag', 'noindex, nofollow, noarchive')
])
return ['']
def _response_unauthorized(self, environ, start_response):
# Send 400 Bad Request response
start_response('400 Bad Request', [('Content-type', 'application/json')])
return ['{"success": false, "error": "invalid authorization token"}']
def _is_guarded(self, environ):
# We allow annomouys access to asset files, language api and unguarded paths
if (environ['PATH_INFO'].startswith('/webassets/')
or environ['PATH_INFO'].startswith('/base/')
or environ['PATH_INFO'].startswith('/api/i18n/')
or environ['PATH_INFO'] in self._get_unguarded_paths()):
return False
return True
def _get_unguarded_paths(self):
# Get the value of the ckan.auth_guard.unguarded_paths setting
# from the CKAN config file or the default list defined in UNGUARDED_PATHS
# if the setting isn't in the config file.
return plugins.toolkit.aslist(config.get('ckan.auth_guard.unguarded_paths', UNGUARDED_PATHS))
def _is_authorized(self, environ):
if self._is_logged_in(environ):
return True
elif self._is_api_request(environ) and self._has_valid_api_key(environ):
return True
return False
def _is_logged_in(self, environ):
# check if the request belongs to an authenticated session
if 'ckanext-ldap-user' in session:
return True
elif 'repoze.who.identity' in environ:
return True
def _is_api_request(self, environ):
# check the requested path starts with /api/
return environ['PATH_INFO'].startswith('/api/')
def _get_client_api_key(self, environ):
# First look for a key in the user defined API key header
# look for HTTP_X_CKAN_API_KEY or HTTP_AUTHORIZATION
api_key = environ.get(self._get_api_key_header_name(), '')
# Then look for a key in the Authorization header
if not api_key:
api_key = environ.get('HTTP_AUTHORIZATION', '')
# but if the header contains whitespaces, we ignore it
if ' ' in api_key:
api_key = ''
if not api_key:
return None
return unicode(api_key)
def _get_api_key_header_name(self):
# Get the configured CKAN API key header name
header = config.get(base.APIKEY_HEADER_NAME_KEY, base.APIKEY_HEADER_NAME_DEFAULT)
return 'HTTP_' + header.replace('-', '_').upper()
def _has_valid_api_key(self, environ):
# Get client API key
api_key = self._get_client_api_key(environ)
if api_key is not None:
if api_token.get_user_from_token(api_key):
return True
query = model.Session.query(model.User)
if query.filter_by(apikey=api_key).count() > 0:
return True
return False
def _get_login_url(self, environ):
url = environ.get('HTTP_X_FORWARDED_PROTO') or environ.get('wsgi.url_scheme', 'http')
url += '://'
if environ.get('HTTP_HOST'):
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME']
url += '/user/login'
if environ['PATH_INFO'] != '' and environ['PATH_INFO'] != '/':
url += '?came_from=' + environ['PATH_INFO']
return url
class AuthGuardPlugin(plugins.SingletonPlugin):
plugins.implements(plugins.IMiddleware, inherit=True)
def make_middleware(self, app, config):
return AuthGuardMiddleware(app, config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment