Last active
October 17, 2023 12:49
-
-
Save jdretz/06d3c80c5c8b279bc727a5eaba353787 to your computer and use it in GitHub Desktop.
User token authentication with FastAPI, Auth0, and Neo4j
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Annotated | |
from neo4j.exceptions import Neo4jError | |
from exceptions.custom_exceptions import DatabaseError | |
from fastapi import Depends | |
from db import driver | |
from user.work import get_user | |
from user.model import User, UserInDB | |
from .validate_token import validate_token | |
from util.models.types import JWTPayload | |
def active_user_in_db( | |
decodePayload: Annotated[JWTPayload, Depends(validate_token)] | |
) -> User: | |
"""Find user in database matching Auth0 `sub` parameter in JWT Payload""" | |
# Create session for a sequence of transactions | |
with driver.session() as session: | |
# Value to identify user in database | |
sub = decodePayload.sub | |
try: | |
# Commit user query | |
userInDb = session.execute_read(get_user, sub=sub) | |
return userInDb | |
except Neo4jError as e: | |
print(e.message) | |
raise DatabaseError | |
def active_user(userInDb: Annotated[UserInDB, Depends(active_user_in_db)]) -> User: | |
"""Initiates a `User` from a `UserInDB` model""" | |
# Remove sensitive information | |
user = User(**userInDb) | |
return user |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydantic_settings import BaseSettings, SettingsConfigDict | |
# https://fastapi.tiangolo.com/advanced/settings/#reading-a-env-file | |
class Settings(BaseSettings): | |
auth0_audience: str | |
auth0_domain: str | |
auth0_post_user_sign_up_key: str | |
auth0_client_id: str | |
auth0_client_secret: str | |
model_config = SettingsConfigDict(env_file=".env") | |
settings = Settings() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import HTTPException, status | |
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/custom_exceptions.py | |
class BadCredentialsException(HTTPException): | |
def __init__(self): | |
super().__init__( | |
status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad credentials" | |
) | |
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/custom_exceptions.py | |
class RequiresAuthenticationException(HTTPException): | |
def __init__(self): | |
super().__init__( | |
status_code=status.HTTP_401_UNAUTHORIZED, detail="Requires authentication" | |
) | |
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/custom_exceptions.py | |
class UnableCredentialsException(HTTPException): | |
def __init__(self): | |
super().__init__( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail="Unable to verify credentials", | |
) | |
class DatabaseError(HTTPException): | |
"""Ambiguous database error""" | |
def __init__( | |
self, | |
) -> None: | |
super().__init__( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail="Database error.", | |
) | |
class AuthenticationProviderException(HTTPException): | |
"""Ambigous error from Auth0""" | |
def __init__(self): | |
super().__init__( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail="Authentication provider error.", | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from auth0.authentication import GetToken | |
from .config import settings | |
domain = settings.auth0_domain | |
client_id = settings.auth0_client_id | |
client_secret = settings.auth0_client_secret | |
get_token = GetToken( | |
domain, | |
client_id, | |
client_secret, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
import jwt | |
from exceptions.custom_exceptions import ( | |
BadCredentialsException, | |
UnableCredentialsException, | |
) | |
from .config import settings | |
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/json_web_token.py | |
@dataclass | |
class JSONWebToken: | |
"""Perform JSON Web Token (JWT) validation using PyJWT""" | |
jwt_access_token: str | |
auth0_issuer_url: str = f"https://{settings.auth0_domain}/" | |
auth0_audience: str = settings.auth0_audience | |
algorithm: str = "RS256" | |
jwks_uri: str = f"{auth0_issuer_url}.well-known/jwks.json" | |
def validate(self): | |
try: | |
jwks_client = jwt.PyJWKClient(self.jwks_uri) | |
jwt_signing_key = jwks_client.get_signing_key_from_jwt( | |
self.jwt_access_token | |
).key | |
payload = jwt.decode( | |
self.jwt_access_token, | |
jwt_signing_key, | |
algorithms=self.algorithm, | |
audience=self.auth0_audience, | |
issuer=self.auth0_issuer_url, | |
) | |
except jwt.exceptions.PyJWKClientError: | |
raise UnableCredentialsException | |
except jwt.exceptions.InvalidTokenError as e: | |
print(e) | |
raise BadCredentialsException | |
return payload |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydantic import BaseModel | |
from typing import Literal | |
class UserAuthentication(BaseModel): | |
username: str | |
password: str | |
class User(BaseModel): | |
name: str | |
role: str | |
class UserInDB(User): | |
sub: str | |
email: str | |
class Auth0UsernamePasswordCredentials(BaseModel): | |
access_token: str | |
expires_in: int | |
token_type: Literal["Bearer"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from neo4j import Record | |
from fastapi import HTTPException | |
def raise_404_if_empty(single_record: Record | None): | |
"""If record is none, assume the value doesn't exist and raise 404 exception""" | |
if single_record is None: | |
raise HTTPException(status_code=404, detail="Not found") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import APIRouter, Depends | |
from auth0 import Auth0Error | |
from security.active_user import active_user | |
from security.get_token import get_token | |
from exceptions.custom_exceptions import AuthenticationProviderException | |
from .model import ( | |
User, | |
UserAuthentication, | |
Auth0UsernamePasswordCredentials, | |
) | |
from util.models.types import HTTPException | |
from security.config import settings | |
router = APIRouter( | |
prefix="/user", | |
) | |
@router.get( | |
"/", | |
name="User", | |
description="Get the authenticated client user in database.", | |
response_model=User, | |
responses={401: {"model": HTTPException}}, | |
) | |
async def root(user: User = Depends(active_user)): | |
return user | |
# The login route is used for retrieving a user token when testing with | |
# an API client (i.e. Postman) and is not how the user will normally | |
# authenticate to use the application. | |
@router.post( | |
"/", | |
description=( | |
"Login user using Auth0 with Username Password Authentication to get access" | |
" token for API." | |
), | |
response_model=Auth0UsernamePasswordCredentials, | |
) | |
async def login( | |
raw_user_credentials: UserAuthentication, | |
): | |
username = raw_user_credentials.username | |
password = raw_user_credentials.password | |
audience = settings.auth0_audience | |
try: | |
resp = get_token.login( | |
username=username, | |
password=password, | |
realm="Username-Password-Authentication", | |
audience=audience, | |
) | |
return resp | |
except Auth0Error as e: | |
# TODO: Log error | |
print(e.message) | |
raise AuthenticationProviderException |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Annotated | |
from fastapi import Depends | |
from fastapi.security import HTTPBearer | |
from fastapi.security.http import HTTPAuthorizationCredentials | |
from .json_web_token import JSONWebToken | |
from util.models.types import JWTPayload | |
# FastAPI 'Dependency' for parsing Authorization header `Bearer [token]` | |
token_auth_scheme = HTTPBearer() | |
def validate_token( | |
token: Annotated[HTTPAuthorizationCredentials, Depends(token_auth_scheme)] | |
) -> JWTPayload: | |
"""Validates Bearer JWT with Auth0 configuration""" | |
payload = JSONWebToken(token.credentials).validate() | |
return JWTPayload(**payload) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from neo4j import ManagedTransaction | |
from .model import UserInDB | |
from exceptions.raise_404_if_empty import raise_404_if_empty | |
# Define a Unit of work to run within a Transaction (`tx`) | |
def get_user(tx: ManagedTransaction, sub: str) -> UserInDB: | |
result = tx.run( | |
""" | |
MATCH (u:User { sub: $sub }) | |
RETURN u as user | |
""", | |
sub=sub, | |
) | |
record = result.single() | |
raise_404_if_empty(record) | |
return record.value() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment