Created
February 2, 2022 16:03
-
-
Save andreemidio/1ff309c93339d87254a4858d5a16624f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import re | |
from django.conf import settings | |
from django.http import JsonResponse | |
from keycloak import KeycloakOpenID, KeycloakInvalidTokenError | |
from rest_framework.exceptions import PermissionDenied, NotAuthenticated, AuthenticationFailed | |
logger = logging.getLogger(__name__) | |
class KeycloakMiddleware: | |
def __init__(self, get_response): | |
super().__init__(self, get_response) | |
self.get_response = get_response | |
self.config = settings.KEYCLOAK_CONFIG | |
try: | |
self.server_url = self.config['KEYCLOAK_SERVER_URL'] | |
self.client_id = self.config['KEYCLOAK_CLIENT_ID'] | |
self.realm_name = self.config['KEYCLOAK_REALM'] | |
except KeyError as e: | |
logger.info(e) | |
raise Exception("KEYCLOAK_SERVER_URL, KEYCLOAK_CLIENT_ID or KEYCLOAK_REALM not found.") | |
self.client_secret_key = self.config.get('KEYCLOAK_CLIENT_SECRET_KEY', None) | |
self.client_public_key = self.config.get('KEYCLOAK_CLIENT_PUBLIC_KEY', None) | |
self.default_access = self.config.get('KEYCLOAK_DEFAULT_ACCESS', 'DENY') | |
self.method_validated_token = self.config.get('KEYCLOAK_METHOD_VALIDATE_TOKEN', 'INTROSPECT') | |
self.keycloak_authorization_config = self.config.get('KEYCLOAK_AUTHORIZATION_CONFIG', None) | |
self.keycloak = KeycloakOpenID( | |
server_url=self.server_url, | |
client_id=self.client_id, | |
realm_name=self.realm_name, | |
client_secret_key=self.client_secret_key | |
) | |
if self.keycloak_authorization_config: | |
self.keycloak.load_authorization_config(self.keycloak_authorization_config) | |
self.get_response = get_response | |
@property | |
def keycloak(self): | |
return self._keycloak | |
@keycloak.setter | |
def keycloak(self, value): | |
self._keycloak = value | |
@keycloak.setter | |
def keycloak_setter(self, value): | |
self._keycloak = value | |
@keycloak.setter | |
def keycloak(self, value): | |
self._keycloak = value | |
@property | |
def config(self): | |
return self._config | |
@config.setter | |
def config(self, value): | |
self._config = value | |
@property | |
def server_url(self): | |
return self._server_url | |
@server_url.setter | |
def server_url(self, value): | |
self._server_url = value | |
@property | |
def client_id(self): | |
return self._client_id | |
@client_id.setter | |
def client_id(self, value): | |
self._client_id = value | |
@property | |
def client_secret_key(self): | |
return self._client_secret_key | |
@client_secret_key.setter | |
def client_secret_key(self, value): | |
self._client_secret_key = value | |
@property | |
def client_public_key(self): | |
return self._client_public_key | |
@client_public_key.setter | |
def client_public_key(self, value): | |
self._client_public_key = value | |
@property | |
def realm(self): | |
return self._realm | |
@realm.setter | |
def realm(self, value): | |
self._realm = value | |
@property | |
def keycloak_authorization_config(self): | |
return self._keycloak_authorization_config | |
@keycloak_authorization_config.setter | |
def keycloak_authorization_config(self, value): | |
self._keycloak_authorization_config = value | |
@property | |
def method_validated_token(self): | |
return self._method_validated_token | |
@method_validated_token.setter | |
def method_validated_token(self, value): | |
self._method_validated_token = value | |
def __call__(self, request, *args, **kwargs): | |
return self.get_response(request) | |
def process_view(self, request, view_func, view_args, view_kwargs): | |
if hasattr(settings, 'KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS'): | |
path = request.path_info.lstrip('/') | |
if any(re.match(m, path) for m in settings.KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS): | |
logger.debug("** Exclude path found, skipping") | |
return None | |
try: | |
view_scopes = view_func.cls.keycloak_scopes | |
except AttributeError as e: | |
logger.debug(e) | |
logger.debug( | |
f'Allowing free acesss, since no authorization configuration (keycloak_scopes) found for this request route :{request}') | |
return None | |
if 'HTTP_AUTHORIZATION' not in request.META: | |
return JsonResponse( | |
dict( | |
detail=PermissionDenied.default_detail, | |
status=NotAuthenticated.status_code | |
) | |
) | |
auth_header = request.META.get('HTTP_AUTHORIZATION').split() | |
token = auth_header[1] if len(auth_header) == 2 else auth_header[0] | |
required_scope = view_scopes.get(request.method, None) \ | |
if view_scopes.get(request.method, None) else view_scopes.get('DEFAULT', None) | |
if not required_scope and self.default_access == 'DENY': | |
return JsonResponse( | |
dict( | |
detail=PermissionDenied.default_detail, | |
status=PermissionDenied.status_code | |
) | |
) | |
try: | |
user_permissions = self.keycloak.get_permissions( | |
token=token, | |
method_token_info=self._method_validated_token.lower(), | |
key=self.client_public_key | |
) | |
except KeycloakInvalidTokenError as e: | |
logger.info(e) | |
return JsonResponse( | |
dict( | |
detail=AuthenticationFailed.default_detail, | |
status=AuthenticationFailed.status_code | |
) | |
) | |
for permissions in user_permissions: | |
if required_scope in permissions.scopes: | |
return None | |
return JsonResponse( | |
dict( | |
detail=PermissionDenied.default_detail, | |
status=PermissionDenied.status_code | |
) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment