Skip to content

Instantly share code, notes, and snippets.

@weiyuDatawiza
Created August 24, 2022 10:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weiyuDatawiza/6c835564a8afd4114e9e0a88d95ec37f to your computer and use it in GitHub Desktop.
Save weiyuDatawiza/6c835564a8afd4114e9e0a88d95ec37f to your computer and use it in GitHub Desktop.
Superset middleware to use AUTH_REMOTE_USER authentication method by Datawiza
# This code follows the naming convention here:
# https://google.github.io/styleguide/pyguide.html#316-naming
from enum import Enum
import os
import jwt
from flask import redirect, g, flash, request, current_app
from flask_appbuilder._compat import as_unicode
from flask_appbuilder.security.manager import AUTH_REMOTE_USER
from flask_appbuilder.security.views import AuthRemoteUserView
from flask_appbuilder.security.views import expose
from flask_login import login_user, logout_user
from superset.security import SupersetSecurityManager
# The keys used by/as:
# 1. The incoming JWT token.
# 2. The local DB user.
# 3. The attributes of the UserInfo class.
# NOTE: Do NOT change these keys unless you know what you are doing.
USERNAME_KEY = "username"
FIRSTNAME_KEY = "first_name"
LASTNAME_KEY = "last_name"
EMAIL_KEY = "email"
ROLE_KEY = "role"
DEFAULT_USER_KEY = "email"
DW_TOKEN_KEYWORD = "dw-token"
JWT_ALGO = "HS256"
JWT_SECRET_KEYWORD = "JWT_SECRET"
ROLE_SEPARATOR = ":"
DEFAULT_ROLE = "Gamma"
# When True, the user info in local DB will be updated
# if it's different from the info in IdP.
# NOTE: The logic to update the role/group info in the local DB
# is different than others claims.
# If the role from IdP is not empty or None, that role will always
# be used to update the local DB. However, if the role from IdP is
# empty or None, the local DB role will not be updated.
UPDATE_LOCAL_USER_INFO = True
class IdpUserInfoSource(Enum):
# User info is transmitted via headers of the HTTP response
HEADER = 1
# User info is transmitted as JWT TOKEN in the HTTP response
JWT_TOKEN = 2
# Specifies which source to retrieve the the user information.
USER_INFO_SOURCE = IdpUserInfoSource.JWT_TOKEN
class UserInfo:
def __init__(self, userinfo_dict: dict):
"""
Instantiate a UserInfo from a dictionary object with user information,
which must have the following keys: username, firstname,
lastname, email and role.
"""
self.username = userinfo_dict.get(USERNAME_KEY)
self.first_name = userinfo_dict.get(FIRSTNAME_KEY)
self.last_name = userinfo_dict.get(LASTNAME_KEY)
self.email = userinfo_dict.get(EMAIL_KEY)
self.role = userinfo_dict.get(ROLE_KEY)
self.uid = userinfo_dict.get(DEFAULT_USER_KEY)
def __str__(self) -> str:
return f"""username: {self.username},
firstname: {self.first_name},
lastname: {self.last_name},
email: {self.email},
role: {self.role},
uid: {self.uid}"""
def is_valid(self) -> bool:
return all(
[
self.email,
self.first_name,
self.last_name,
self.username,
# self.role,
self.uid,
]
)
def validate_roles_to_list(sm, roles_str: str) -> list:
if not roles_str:
return []
_roles = roles_str.split(ROLE_SEPARATOR)
# Make sure the input role is valid in the SuperSet role set.
# All invalid roles will become None.
roles = [sm.find_role(role_name) for role_name in _roles]
# Remove all invalid roles (None).
return list(filter(None, roles))
def new_roles_with_default(
sm, idp_roles: list, local_db_roles: list, default_role: str
) -> list:
"""
Decides the new roles based on the condition of both the roles from IdP and local DB.
All input roles are assumed to have been cleaned and valid in the SuperSet system.
If the IdP roles are not empty or None, the IdP roles will be returned;
Otherwise,
if the local DB roles are empty or None:
A default role will be returned;
if local DB roles are not empty or None:
The local DB roles will be returned;
"""
if idp_roles:
return idp_roles
else:
if not local_db_roles:
return [sm.find_role(default_role)]
return local_db_roles
class RemoteUserMiddleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
user = environ.pop("HTTP_PROXY_REMOTE_USER", None)
environ["REMOTE_USER"] = user
return self.app(environ, start_response)
ADDITIONAL_MIDDLEWARE = [
RemoteUserMiddleware,
]
class DatawizaCustomRemoteUserView(AuthRemoteUserView):
# Leave blank
login_template = ""
@expose("/logout/")
def logout(self):
logout_user()
return redirect("/datawiza/ab-logout")
@expose("/login/")
def login(self):
# The user information obtained from the identity provider.
idp_user_info = self.extract_idp_user_info()
if not idp_user_info:
return "Error occurs when retriving user info from IdP."
if not idp_user_info.uid:
flash(as_unicode(self.invalid_login_message), "warning")
if not idp_user_info.is_valid():
current_app.logger.error(
f"Invalid user info from identity provider: {idp_user_info}"
)
return "The user info is invalid."
if g.user is not None and g.user.is_authenticated:
return redirect(self.appbuilder.get_url_for_index)
sm = self.appbuilder.sm
session = sm.get_session
local_user = (
session.query(sm.user_model).filter_by(email=idp_user_info.uid).first()
)
# If the user does not exist in local DB, we will create a user in the local DB
# with a default role.
# username, firstname, lastname, email, and role are required.
# NOTE: username and email should be unique per SuperSet's design.
if local_user is None:
roles = new_roles_with_default(
sm, validate_roles_to_list(sm, idp_user_info.role), None, DEFAULT_ROLE
)
try:
user = sm.add_user(
idp_user_info.username,
idp_user_info.first_name,
idp_user_info.last_name,
idp_user_info.email,
roles,
)
login_user(user, remember=False)
return redirect(self.appbuilder.get_url_for_index)
except AttributeError as ex:
current_app.logger.error("Error occurs when creating user: %s", ex)
return (
f"Sorry. Error occurs when creating user for {idp_user_info.uid}."
)
# If you use user's email as identifier, you should use auth_user_oid which takes email as parameter.
# Ref: https://flask-appbuilder.readthedocs.io/en/latest/api.html#flask_appbuilder.security.manager.BaseSecurityManager.auth_user_oid
# Otherwise, if you'd like to use user's username, you can use auth_user_remote_user which take username as parameter.
# user = self.appbuilder.sm.auth_user_remote_user(username)
# Ref: https://flask-appbuilder.readthedocs.io/en/latest/api.html#flask_appbuilder.security.manager.BaseSecurityManager.auth_user_remote_user
local_user = self.appbuilder.sm.auth_user_oid(idp_user_info.uid)
if local_user is None:
flash(as_unicode(self.invalid_login_message), "warning")
# We will update user info from IdP if UPDATE_LOCAL_USER_INFO set to True.
# However, any attribute set empty in IdP will NOT be updated.
if UPDATE_LOCAL_USER_INFO:
self.check_and_update_local_user_info(idp_user_info, local_user, sm)
try:
login_user(local_user)
except AttributeError as ex:
current_app.logger.error("Error occurs when logging in user: %s", ex)
return "Error occurs when logging in user."
return redirect(self.appbuilder.get_url_for_index)
def extract_idp_user_info(self) -> UserInfo:
if USER_INFO_SOURCE == IdpUserInfoSource.HEADER:
# Simply read request headers. Username is necessary.
return UserInfo(request.headers)
elif USER_INFO_SOURCE == IdpUserInfoSource.JWT_TOKEN:
# Read env variable jwt_secret.
# Ref: https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
jwt_secret = os.getenv(JWT_SECRET_KEYWORD)
if not jwt_secret:
current_app.logger.error(
f"Cannot get env variable {JWT_SECRET_KEYWORD} for decoding JWT."
)
raise Exception("Missing JWT secret configuration.")
try:
# Use variable jwt_secret to decode JWT.
dw_token = request.headers.get(DW_TOKEN_KEYWORD)
token_payload = jwt.decode(dw_token, jwt_secret, algorithms=[JWT_ALGO])
return UserInfo(token_payload)
except Exception as ex:
current_app.logger.error(
f"Error occurs when retrieving user info from JWT: {ex}"
)
return None
else:
current_app.logger.error(
f"Unsupport IdP user info source: {USER_INFO_SOURCE}"
)
return None
def check_and_update_local_user_info(
self, idp_user_info: UserInfo, local_user_info, sm
):
"""
Check if there is any update on the user information from the identity provider.
If there is any, update the user information in the local DB with the new information
from IdP.
NOTE: This assumes that the Identity Provider is the single source of truth for basic
user information, including: username, firstname, last, email.
"""
has_update = False
fields = [USERNAME_KEY, FIRSTNAME_KEY, LASTNAME_KEY, EMAIL_KEY]
for field in fields:
idp_info = getattr(idp_user_info, field)
local_info = getattr(local_user_info, field)
if idp_info and idp_info != local_info:
setattr(local_user_info, field, idp_info)
has_update = True
# Roles require special handling.
new_roles = new_roles_with_default(
sm,
validate_roles_to_list(sm, idp_user_info.role),
local_user_info.roles,
DEFAULT_ROLE,
)
if new_roles and set(new_roles) != set(local_user_info.roles):
local_user_info.roles = new_roles
has_update = True
if has_update:
try:
sm.update_user(local_user_info)
except AttributeError as ex:
current_app.logger.error("Error occurs when updating user info: %s", ex)
return "Error occurs when updating user info."
class DatawizaCustomSecurityManager(SupersetSecurityManager):
authremoteuserview = DatawizaCustomRemoteUserView
AUTH_TYPE = AUTH_REMOTE_USER
CUSTOM_SECURITY_MANAGER = DatawizaCustomSecurityManager
# If the load balancer is inserting X-Forwarded-For/X-Forwarded-Proto headers,
# you should set ENABLE_PROXY_FIX = True in the superset config file (superset_config.py) to extract and use the headers.
# Ref: https://superset.apache.org/docs/installation/configuring-superset/#configuration-behind-a-load-balancer
ENABLE_PROXY_FIX = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment