Skip to content

Instantly share code, notes, and snippets.

@andreemidio
Created February 2, 2022 16:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreemidio/1ff309c93339d87254a4858d5a16624f to your computer and use it in GitHub Desktop.
Save andreemidio/1ff309c93339d87254a4858d5a16624f to your computer and use it in GitHub Desktop.
import logging
import re
from django.conf import settings
from django.http import JsonResponse
from keycloak import KeycloakOpenID, KeycloakInvalidTokenError
from rest_framework.exceptions import PermissionDenied, NotAuthenticated, AuthenticationFailed
logger = logging.getLogger(__name__)
class KeycloakMiddleware:
def __init__(self, get_response):
super().__init__(self, get_response)
self.get_response = get_response
self.config = settings.KEYCLOAK_CONFIG
try:
self.server_url = self.config['KEYCLOAK_SERVER_URL']
self.client_id = self.config['KEYCLOAK_CLIENT_ID']
self.realm_name = self.config['KEYCLOAK_REALM']
except KeyError as e:
logger.info(e)
raise Exception("KEYCLOAK_SERVER_URL, KEYCLOAK_CLIENT_ID or KEYCLOAK_REALM not found.")
self.client_secret_key = self.config.get('KEYCLOAK_CLIENT_SECRET_KEY', None)
self.client_public_key = self.config.get('KEYCLOAK_CLIENT_PUBLIC_KEY', None)
self.default_access = self.config.get('KEYCLOAK_DEFAULT_ACCESS', 'DENY')
self.method_validated_token = self.config.get('KEYCLOAK_METHOD_VALIDATE_TOKEN', 'INTROSPECT')
self.keycloak_authorization_config = self.config.get('KEYCLOAK_AUTHORIZATION_CONFIG', None)
self.keycloak = KeycloakOpenID(
server_url=self.server_url,
client_id=self.client_id,
realm_name=self.realm_name,
client_secret_key=self.client_secret_key
)
if self.keycloak_authorization_config:
self.keycloak.load_authorization_config(self.keycloak_authorization_config)
self.get_response = get_response
@property
def keycloak(self):
return self._keycloak
@keycloak.setter
def keycloak(self, value):
self._keycloak = value
@keycloak.setter
def keycloak_setter(self, value):
self._keycloak = value
@keycloak.setter
def keycloak(self, value):
self._keycloak = value
@property
def config(self):
return self._config
@config.setter
def config(self, value):
self._config = value
@property
def server_url(self):
return self._server_url
@server_url.setter
def server_url(self, value):
self._server_url = value
@property
def client_id(self):
return self._client_id
@client_id.setter
def client_id(self, value):
self._client_id = value
@property
def client_secret_key(self):
return self._client_secret_key
@client_secret_key.setter
def client_secret_key(self, value):
self._client_secret_key = value
@property
def client_public_key(self):
return self._client_public_key
@client_public_key.setter
def client_public_key(self, value):
self._client_public_key = value
@property
def realm(self):
return self._realm
@realm.setter
def realm(self, value):
self._realm = value
@property
def keycloak_authorization_config(self):
return self._keycloak_authorization_config
@keycloak_authorization_config.setter
def keycloak_authorization_config(self, value):
self._keycloak_authorization_config = value
@property
def method_validated_token(self):
return self._method_validated_token
@method_validated_token.setter
def method_validated_token(self, value):
self._method_validated_token = value
def __call__(self, request, *args, **kwargs):
return self.get_response(request)
def process_view(self, request, view_func, view_args, view_kwargs):
if hasattr(settings, 'KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS'):
path = request.path_info.lstrip('/')
if any(re.match(m, path) for m in settings.KEYCLOAK_BEARER_AUTHENTICATION_EXEMPT_PATHS):
logger.debug("** Exclude path found, skipping")
return None
try:
view_scopes = view_func.cls.keycloak_scopes
except AttributeError as e:
logger.debug(e)
logger.debug(
f'Allowing free acesss, since no authorization configuration (keycloak_scopes) found for this request route :{request}')
return None
if 'HTTP_AUTHORIZATION' not in request.META:
return JsonResponse(
dict(
detail=PermissionDenied.default_detail,
status=NotAuthenticated.status_code
)
)
auth_header = request.META.get('HTTP_AUTHORIZATION').split()
token = auth_header[1] if len(auth_header) == 2 else auth_header[0]
required_scope = view_scopes.get(request.method, None) \
if view_scopes.get(request.method, None) else view_scopes.get('DEFAULT', None)
if not required_scope and self.default_access == 'DENY':
return JsonResponse(
dict(
detail=PermissionDenied.default_detail,
status=PermissionDenied.status_code
)
)
try:
user_permissions = self.keycloak.get_permissions(
token=token,
method_token_info=self._method_validated_token.lower(),
key=self.client_public_key
)
except KeycloakInvalidTokenError as e:
logger.info(e)
return JsonResponse(
dict(
detail=AuthenticationFailed.default_detail,
status=AuthenticationFailed.status_code
)
)
for permissions in user_permissions:
if required_scope in permissions.scopes:
return None
return JsonResponse(
dict(
detail=PermissionDenied.default_detail,
status=PermissionDenied.status_code
)
)
@andreemidio
Copy link
Author

2022-02-02 13_02_48-Window

@andreemidio
Copy link
Author

2022-02-02 13_03_58-Window

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment