Last active
January 19, 2023 15:04
-
-
Save thomaxxl/aafc9975539bd7a4adb2e5cc10f5d947 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
System support for login / authentication. | |
Applications POST to login to obtain an access token, | |
which they provide in the header of subsequent requests. | |
e.g. | |
def login(): | |
post_uri = 'http://localhost:5656/api/authentication-User/login' | |
post_data = {"username": "_user_","password": "_password_"} | |
r = requests.post(url=post_uri, json = post_data) | |
response_text = r.text | |
status_code = r.status_code | |
if status_code > 300: | |
raise Exception(f'POST login failed with {r.text}') | |
result_data = json.loads(response_text) | |
result_map = DotMap(result_data) | |
token = result_map.access_token | |
header = {'Authorization': 'Bearer {}'.format(f'{token}')} | |
return header | |
""" | |
import logging, sys | |
from urllib import response | |
from flask import Flask, abort, jsonify, request, current_app | |
from flask_jwt_extended import JWTManager, verify_jwt_in_request, create_access_token, set_access_cookies | |
from datetime import timedelta | |
from functools import wraps | |
from typing import Any | |
import config | |
authentication_provider = config.Config.SECURITY_PROVIDER | |
security_logger = logging.getLogger('api_logic_server_app') | |
def jwt_required( | |
optional: bool = False, | |
fresh: bool = False, | |
refresh: bool = False, | |
locations: list = None, | |
verify_type: bool = True, | |
) -> Any: | |
""" | |
Modified flask_jwt_extended.jwt_required() | |
We exclude the token auth login endpoint from jwt and set the access token as a cookie | |
""" | |
def wrapper(fn): | |
@wraps(fn) | |
def decorator(*api_args, **api_kwargs): | |
if request.endpoint == 'api.authentication-User.login': | |
response = fn(*api_args, **api_kwargs) | |
if response.status_code == 200: | |
access_token = response.json.get("access_token","") | |
set_access_cookies(response, access_token) | |
return response | |
try: | |
verify_jwt_in_request(optional, fresh, refresh, locations, verify_type) | |
response = current_app.ensure_sync(fn)(*api_args, **api_kwargs) | |
return response | |
except Exception as exc: | |
security_logger.warning(f"JWT verification failed: {exc}") | |
abort(401, 'Unauthorized') | |
return decorator | |
return wrapper | |
def configure_auth(flask_app: Flask, database: object, method_decorators: object): | |
""" | |
Called on server start by api_logic_server_run to | |
- initialize jwt | |
- establish Flask end points for login. | |
Args: | |
flask_app (Flask): _description_ | |
database (object): _description_ | |
method_decorators (object): _description_ | |
Returns: | |
_type_: (no return) | |
""" | |
flask_app.config["PROPAGATE_EXCEPTIONS"] = True | |
flask_app.config["JWT_SECRET_KEY"] = "ApiLogicServerSecret" # Change this! | |
flask_app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=222) # th longer exp | |
flask_app.config["JWT_REFRESH_TOKEN_EXPIRES"] = timedelta(days=30) | |
jwt = JWTManager(flask_app) | |
@flask_app.route("/auth/login", methods=["POST"]) | |
def login(): | |
""" | |
Post id/password, returns token to be placed in header of subsequent requests. | |
Returns: | |
string: access token | |
""" | |
username = request.json.get("username", None) | |
password = request.json.get("password", None) | |
user = authentication_provider.get_user(username, password) # val - use auth_provider | |
if not user or not user.check_password(password): # TODO avoid model method? += provider? | |
return jsonify("Wrong username or password"), 401 | |
access_token = create_access_token(identity=user) # serialize and encode | |
return jsonify(access_token=access_token) | |
@jwt.user_identity_loader | |
def user_identity_lookup(user): | |
return user.id | |
@jwt.user_lookup_loader | |
def user_lookup_callback(_jwt_header, jwt_data): | |
identity = jwt_data["sub"] | |
return authentication_provider.get_user(identity, "") # val - use auth_provider | |
def check_password(self, password): | |
print('check_password', password) | |
return True | |
method_decorators.append(jwt_required(locations=["headers", "cookies", "json", "query_string"])) | |
security_logger.info("\nAuthentication loaded -- api calls now require authorization header") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment