FastAPI Basic Authentication - Based on
from typing import Optional
import base64
from passlib.context import CryptContext
from datetime import datetime, timedelta
import jwt
from jwt import PyJWTError
from pydantic import BaseModel
from fastapi import Depends, FastAPI, HTTPException
from fastapi.encoders import jsonable_encoder
from import OAuth2PasswordRequestForm, OAuth2
from import SecurityBase
from import get_authorization_scheme_param
from import get_swagger_ui_html
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.openapi.utils import get_openapi
from starlette.status import HTTP_403_FORBIDDEN
from starlette.responses import RedirectResponse, Response, JSONResponse
from starlette.requests import Request
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str = None
class User(BaseModel):
username: str
email: str = None
full_name: str = None
disabled: bool = None
class UserInDB(User):
hashed_password: str
class OAuth2PasswordBearerCookie(OAuth2):
def __init__(
tokenUrl: str,
scheme_name: str = None,
scopes: dict = None,
auto_error: bool = True,
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(flows=flows, scheme_name=scheme_name, auto_error=auto_error)
async def __call__(self, request: Request) -> Optional[str]:
header_authorization: str = request.headers.get("Authorization")
cookie_authorization: str = request.cookies.get("Authorization")
header_scheme, header_param = get_authorization_scheme_param(
cookie_scheme, cookie_param = get_authorization_scheme_param(
if header_scheme.lower() == "bearer":
authorization = True
scheme = header_scheme
param = header_param
elif cookie_scheme.lower() == "bearer":
authorization = True
scheme = cookie_scheme
param = cookie_param
authorization = False
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
return None
return param
class BasicAuth(SecurityBase):
def __init__(self, scheme_name: str = None, auto_error: bool = True):
self.scheme_name = scheme_name or self.__class__.__name__
self.auto_error = auto_error
async def __call__(self, request: Request) -> Optional[str]:
authorization: str = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "basic":
if self.auto_error:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Not authenticated"
return None
return param
basic_auth = BasicAuth(auto_error=False)
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearerCookie(tokenUrl="/token")
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
async def homepage():
return "Welcome to the security test!""/token", response_model=Token)
async def route_login_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
return {"access_token": access_token, "token_type": "bearer"}
async def route_logout_and_remove_cookie():
response = RedirectResponse(url="/")
response.delete_cookie("Authorization", domain="")
return response
async def login_basic(auth: BasicAuth = Depends(basic_auth)):
if not auth:
response = Response(headers={"WWW-Authenticate": "Basic"}, status_code=401)
return response
decoded = base64.b64decode(auth).decode("ascii")
username, _, password = decoded.partition(":")
user = authenticate_user(fake_users_db, username, password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect email or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": username}, expires_delta=access_token_expires
token = jsonable_encoder(access_token)
response = RedirectResponse(url="/docs")
value=f"Bearer {token}",
return response
response = Response(headers={"WWW-Authenticate": "Basic"}, status_code=401)
return response
async def get_open_api_endpoint(current_user: User = Depends(get_current_active_user)):
return JSONResponse(get_openapi(title="FastAPI", version=1, routes=app.routes))
async def get_documentation(current_user: User = Depends(get_current_active_user)):
return get_swagger_ui_html(openapi_url="/openapi.json", title="docs")
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
