Skip to content

Instantly share code, notes, and snippets.

@tsatsujnr139
Created April 21, 2021 23:22
Show Gist options
  • Save tsatsujnr139/f6031075eed4b62a630f12e82341f85a to your computer and use it in GitHub Desktop.
Save tsatsujnr139/f6031075eed4b62a630f12e82341f85a to your computer and use it in GitHub Desktop.
import logging
from typing import Generator
from app import crud, models, schemas
from app.constants.role import Role
from app.core import security
from app.core.config import settings
from app.db.session import SessionLocal
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/access-token",
scopes={
Role.GUEST["name"]: Role.GUEST["description"],
Role.ACCOUNT_ADMIN["name"]: Role.ACCOUNT_ADMIN["description"],
Role.ACCOUNT_MANAGER["name"]: Role.ACCOUNT_MANAGER["description"],
Role.ADMIN["name"]: Role.ADMIN["description"],
Role.SUPER_ADMIN["name"]: Role.SUPER_ADMIN["description"],
},
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def get_db() -> Generator:
try:
db = SessionLocal()
yield db
finally:
db.close()
def get_current_user(
security_scopes: SecurityScopes,
db: Session = Depends(get_db),
token: str = Depends(reusable_oauth2),
) -> models.User:
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
)
if payload.get("id") is None:
raise credentials_exception
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
logger.error("Error Decoding Token", exc_info=True)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
user = crud.user.get(db, id=token_data.id)
if not user:
raise credentials_exception
if security_scopes.scopes and not token_data.role:
raise HTTPException(
status_code=401,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
if (
security_scopes.scopes
and token_data.role not in security_scopes.scopes
):
raise HTTPException(
status_code=401,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user
def get_current_active_user(
current_user: models.User = Security(get_current_user, scopes=[],),
) -> models.User:
if not crud.user.is_active(current_user):
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment