Flask OIDC / Keycloak Check resource access
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
# flask_oidc_check_resource_access.py | |
# Written by Maximilian Thoma 2022 | |
# Visit https://www.lanbugs.de | |
# Free to use for everyone, without any warranty :-) | |
# | |
# Manual / Remarks | |
# I use it together with flask-oidc and Keycloak to give permission to API endpoint only to defined role. | |
# Its a wrapper function you can use together with @oidc.accept_token | |
# | |
# Example: | |
# app = Flask(__name__) | |
# oidc = OpenIDConnect(app) | |
# | |
# @app.route("/api") | |
# @oidc.accept_token(require_token=True) | |
# @check_resource_access("client_name", "role_name") | |
# def hello_api(): | |
# return {"say": "hello"} | |
# | |
# Required packages | |
# - flask | |
# - flask-oidc | |
# - pyjwt | |
# | |
from functools import wraps | |
from flask import request | |
import json | |
import jwt | |
def check_resource_access(client, role): | |
def wrapper(view_func): | |
@wraps(view_func) | |
def decorated(*args, **kwargs): | |
token = None | |
if 'Authorization' in request.headers and request.headers['Authorization'].startswith('Bearer '): | |
token = request.headers['Authorization'].split(None,1)[1].strip() | |
if 'access_token' in request.form: | |
token = request.form['access_token'] | |
elif 'access_token' in request.args: | |
token = request.args['access_token'] | |
decoded = jwt.decode(token, options={"verify_signature": False}) | |
if "resource_access" not in decoded.keys(): | |
response_body = {'error': 'no resource_access profiles in token'} | |
response_body = json.dumps(response_body) | |
return response_body, 401, {'WWW-Authenticate': 'Bearer'} | |
if client in decoded['resource_access'].keys(): | |
if role in decoded['resource_access'][client]['roles']: | |
return view_func(*args, **kwargs) | |
else: | |
response_body = {'error': 'role not in resource_access'} | |
response_body = json.dumps(response_body) | |
return response_body, 401, {'WWW-Authenticate': 'Bearer'} | |
else: | |
response_body = {'error': 'client not in resource_access'} | |
response_body = json.dumps(response_body) | |
return response_body, 401, {'WWW-Authenticate': 'Bearer'} | |
return decorated | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment