Created
April 21, 2021 23:22
-
-
Save tsatsujnr139/f6031075eed4b62a630f12e82341f85a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from typing import Generator | |
from app import crud, models, schemas | |
from app.constants.role import Role | |
from app.core import security | |
from app.core.config import settings | |
from app.db.session import SessionLocal | |
from fastapi import Depends, HTTPException, Security, status | |
from fastapi.security import OAuth2PasswordBearer, SecurityScopes | |
from jose import jwt | |
from pydantic import ValidationError | |
from sqlalchemy.orm import Session | |
reusable_oauth2 = OAuth2PasswordBearer( | |
tokenUrl=f"{settings.API_V1_STR}/auth/access-token", | |
scopes={ | |
Role.GUEST["name"]: Role.GUEST["description"], | |
Role.ACCOUNT_ADMIN["name"]: Role.ACCOUNT_ADMIN["description"], | |
Role.ACCOUNT_MANAGER["name"]: Role.ACCOUNT_MANAGER["description"], | |
Role.ADMIN["name"]: Role.ADMIN["description"], | |
Role.SUPER_ADMIN["name"]: Role.SUPER_ADMIN["description"], | |
}, | |
) | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def get_db() -> Generator: | |
try: | |
db = SessionLocal() | |
yield db | |
finally: | |
db.close() | |
def get_current_user( | |
security_scopes: SecurityScopes, | |
db: Session = Depends(get_db), | |
token: str = Depends(reusable_oauth2), | |
) -> models.User: | |
if security_scopes.scopes: | |
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' | |
else: | |
authenticate_value = "Bearer" | |
credentials_exception = HTTPException( | |
status_code=401, | |
detail="Could not validate credentials", | |
headers={"WWW-Authenticate": authenticate_value}, | |
) | |
try: | |
payload = jwt.decode( | |
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM] | |
) | |
if payload.get("id") is None: | |
raise credentials_exception | |
token_data = schemas.TokenPayload(**payload) | |
except (jwt.JWTError, ValidationError): | |
logger.error("Error Decoding Token", exc_info=True) | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, | |
detail="Could not validate credentials", | |
) | |
user = crud.user.get(db, id=token_data.id) | |
if not user: | |
raise credentials_exception | |
if security_scopes.scopes and not token_data.role: | |
raise HTTPException( | |
status_code=401, | |
detail="Not enough permissions", | |
headers={"WWW-Authenticate": authenticate_value}, | |
) | |
if ( | |
security_scopes.scopes | |
and token_data.role not in security_scopes.scopes | |
): | |
raise HTTPException( | |
status_code=401, | |
detail="Not enough permissions", | |
headers={"WWW-Authenticate": authenticate_value}, | |
) | |
return user | |
def get_current_active_user( | |
current_user: models.User = Security(get_current_user, scopes=[],), | |
) -> models.User: | |
if not crud.user.is_active(current_user): | |
raise HTTPException(status_code=400, detail="Inactive user") | |
return current_user |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment