Skip to content

Instantly share code, notes, and snippets.

@jdretz
Last active October 17, 2023 12:49
Show Gist options
  • Save jdretz/06d3c80c5c8b279bc727a5eaba353787 to your computer and use it in GitHub Desktop.
Save jdretz/06d3c80c5c8b279bc727a5eaba353787 to your computer and use it in GitHub Desktop.
User token authentication with FastAPI, Auth0, and Neo4j
from typing import Annotated
from neo4j.exceptions import Neo4jError
from exceptions.custom_exceptions import DatabaseError
from fastapi import Depends
from db import driver
from user.work import get_user
from user.model import User, UserInDB
from .validate_token import validate_token
from util.models.types import JWTPayload
def active_user_in_db(
decodePayload: Annotated[JWTPayload, Depends(validate_token)]
) -> User:
"""Find user in database matching Auth0 `sub` parameter in JWT Payload"""
# Create session for a sequence of transactions
with driver.session() as session:
# Value to identify user in database
sub = decodePayload.sub
try:
# Commit user query
userInDb = session.execute_read(get_user, sub=sub)
return userInDb
except Neo4jError as e:
print(e.message)
raise DatabaseError
def active_user(userInDb: Annotated[UserInDB, Depends(active_user_in_db)]) -> User:
"""Initiates a `User` from a `UserInDB` model"""
# Remove sensitive information
user = User(**userInDb)
return user
from pydantic_settings import BaseSettings, SettingsConfigDict
# https://fastapi.tiangolo.com/advanced/settings/#reading-a-env-file
class Settings(BaseSettings):
auth0_audience: str
auth0_domain: str
auth0_post_user_sign_up_key: str
auth0_client_id: str
auth0_client_secret: str
model_config = SettingsConfigDict(env_file=".env")
settings = Settings()
from fastapi import HTTPException, status
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/custom_exceptions.py
class BadCredentialsException(HTTPException):
def __init__(self):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Bad credentials"
)
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/custom_exceptions.py
class RequiresAuthenticationException(HTTPException):
def __init__(self):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Requires authentication"
)
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/custom_exceptions.py
class UnableCredentialsException(HTTPException):
def __init__(self):
super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Unable to verify credentials",
)
class DatabaseError(HTTPException):
"""Ambiguous database error"""
def __init__(
self,
) -> None:
super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Database error.",
)
class AuthenticationProviderException(HTTPException):
"""Ambigous error from Auth0"""
def __init__(self):
super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Authentication provider error.",
)
from auth0.authentication import GetToken
from .config import settings
domain = settings.auth0_domain
client_id = settings.auth0_client_id
client_secret = settings.auth0_client_secret
get_token = GetToken(
domain,
client_id,
client_secret,
)
from dataclasses import dataclass
import jwt
from exceptions.custom_exceptions import (
BadCredentialsException,
UnableCredentialsException,
)
from .config import settings
# https://github.com/auth0-developer-hub/api_fastapi_python_hello-world/blob/basic-authorization/application/json_web_token.py
@dataclass
class JSONWebToken:
"""Perform JSON Web Token (JWT) validation using PyJWT"""
jwt_access_token: str
auth0_issuer_url: str = f"https://{settings.auth0_domain}/"
auth0_audience: str = settings.auth0_audience
algorithm: str = "RS256"
jwks_uri: str = f"{auth0_issuer_url}.well-known/jwks.json"
def validate(self):
try:
jwks_client = jwt.PyJWKClient(self.jwks_uri)
jwt_signing_key = jwks_client.get_signing_key_from_jwt(
self.jwt_access_token
).key
payload = jwt.decode(
self.jwt_access_token,
jwt_signing_key,
algorithms=self.algorithm,
audience=self.auth0_audience,
issuer=self.auth0_issuer_url,
)
except jwt.exceptions.PyJWKClientError:
raise UnableCredentialsException
except jwt.exceptions.InvalidTokenError as e:
print(e)
raise BadCredentialsException
return payload
from pydantic import BaseModel
from typing import Literal
class UserAuthentication(BaseModel):
username: str
password: str
class User(BaseModel):
name: str
role: str
class UserInDB(User):
sub: str
email: str
class Auth0UsernamePasswordCredentials(BaseModel):
access_token: str
expires_in: int
token_type: Literal["Bearer"]
from neo4j import Record
from fastapi import HTTPException
def raise_404_if_empty(single_record: Record | None):
"""If record is none, assume the value doesn't exist and raise 404 exception"""
if single_record is None:
raise HTTPException(status_code=404, detail="Not found")
from fastapi import APIRouter, Depends
from auth0 import Auth0Error
from security.active_user import active_user
from security.get_token import get_token
from exceptions.custom_exceptions import AuthenticationProviderException
from .model import (
User,
UserAuthentication,
Auth0UsernamePasswordCredentials,
)
from util.models.types import HTTPException
from security.config import settings
router = APIRouter(
prefix="/user",
)
@router.get(
"/",
name="User",
description="Get the authenticated client user in database.",
response_model=User,
responses={401: {"model": HTTPException}},
)
async def root(user: User = Depends(active_user)):
return user
# The login route is used for retrieving a user token when testing with
# an API client (i.e. Postman) and is not how the user will normally
# authenticate to use the application.
@router.post(
"/",
description=(
"Login user using Auth0 with Username Password Authentication to get access"
" token for API."
),
response_model=Auth0UsernamePasswordCredentials,
)
async def login(
raw_user_credentials: UserAuthentication,
):
username = raw_user_credentials.username
password = raw_user_credentials.password
audience = settings.auth0_audience
try:
resp = get_token.login(
username=username,
password=password,
realm="Username-Password-Authentication",
audience=audience,
)
return resp
except Auth0Error as e:
# TODO: Log error
print(e.message)
raise AuthenticationProviderException
from typing import Annotated
from fastapi import Depends
from fastapi.security import HTTPBearer
from fastapi.security.http import HTTPAuthorizationCredentials
from .json_web_token import JSONWebToken
from util.models.types import JWTPayload
# FastAPI 'Dependency' for parsing Authorization header `Bearer [token]`
token_auth_scheme = HTTPBearer()
def validate_token(
token: Annotated[HTTPAuthorizationCredentials, Depends(token_auth_scheme)]
) -> JWTPayload:
"""Validates Bearer JWT with Auth0 configuration"""
payload = JSONWebToken(token.credentials).validate()
return JWTPayload(**payload)
from neo4j import ManagedTransaction
from .model import UserInDB
from exceptions.raise_404_if_empty import raise_404_if_empty
# Define a Unit of work to run within a Transaction (`tx`)
def get_user(tx: ManagedTransaction, sub: str) -> UserInDB:
result = tx.run(
"""
MATCH (u:User { sub: $sub })
RETURN u as user
""",
sub=sub,
)
record = result.single()
raise_404_if_empty(record)
return record.value()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment