Skip to content

Instantly share code, notes, and snippets.

@ScrimForever
Created April 19, 2024 00:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ScrimForever/6bd3470083835d4520c19fa5ca26217c to your computer and use it in GitHub Desktop.
Save ScrimForever/6bd3470083835d4520c19fa5ca26217c to your computer and use it in GitHub Desktop.
from typing import AsyncGenerator, List
from fastapi import Depends
from fastapi_users.db import (
SQLAlchemyBaseOAuthAccountTableUUID,
SQLAlchemyBaseUserTableUUID,
SQLAlchemyUserDatabase,
)
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, relationship
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
class Base(DeclarativeBase):
pass
class OAuthAccount(SQLAlchemyBaseOAuthAccountTableUUID, Base):
pass
class User(SQLAlchemyBaseUserTableUUID, Base):
oauth_accounts: Mapped[List[OAuthAccount]] = relationship(
"OAuthAccount", lazy="joined"
)
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)
async def create_db_and_tables():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
async with async_session_maker() as session:
yield session
async def get_session(session: AsyncSession = Depends(get_async_session)):
yield session
async def get_user_db(session: AsyncSession = Depends(get_async_session)):
yield SQLAlchemyUserDatabase(session, User, OAuthAccount)
from ...repository.setor.setor_repository import SetorRepository
from ...core.users import User
async def return_all_setor(user: User):
setor = SetorRepository()
return await setor.list_all(user)
from dataclasses import dataclass
from sqlalchemy.orm import Session
from ...core.db import get_async_session
from loguru import logger
from sqlalchemy import select
from ...models.setor_model import Setor
from ...schema.setor_schema import SetorSchemaInput
from fastapi import HTTPException
from starlette.status import HTTP_500_INTERNAL_SERVER_ERROR
@dataclass
class SetorRepository:
async def create_setor(self, setor: SetorSchemaInput):
pass
async def list_all(self, user):
query = select(Setor)
try:
logger.info(f'Usuário: {user.email}, listou todos os setores.')
...
except Exception as e:
logger.error(e)
raise HTTPException(status_code=HTTP_500_INTERNAL_SERVER_ERROR, detail="Error processing.")
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ..core.db import get_session
from ..core.users import User, current_active_user
from ..repository.setor.setor_repository import SetorRepository
from ..schema.setor_schema import (
SetorSchema,
SetorSchemaOutput,
)
from ..data.setor.setor_data import return_all_setor
from typing import List
from fastapi import status
setor_router = APIRouter()
@setor_router.get("/", response_model=List[SetorSchemaOutput])
async def get_all_setors(user: User = Depends(current_active_user)):
return_all_list = await return_all_setor(user=user)
return return_all_list
@setor_router.get("/{setor_id}", response_model=SetorSchemaOutput)
async def get_setor(
setor_id: int,
db: Session = Depends(get_session),
current_user: User = Depends(current_active_user),
):
response_get_setor = await SetorRepository(
setor_id=setor_id, db=db, current_user=current_user
).get_setor_by_id()
return response_get_setor
@setor_router.post(
"/", status_code=status.HTTP_201_CREATED, response_model=SetorSchemaOutput
)
async def create_setor(
setor_schema: SetorSchema,
db: Session = Depends(get_session),
current_user: User = Depends(current_active_user),
):
response_create_setor = await SetorRepository(
db=db, current_user=current_user, obj_schema_setor=setor_schema
).create_setor()
return response_create_setor
@setor_router.put("/{setor_id}", response_model=SetorSchemaOutput)
async def update_setor(
setor_id: int,
setor_schema: SetorSchema,
db: Session = Depends(get_session),
current_user: User = Depends(current_active_user),
):
response_update_setor = await SetorRepository(
setor_id=setor_id,
db=db,
current_user=current_user,
obj_schema_setor=setor_schema,
).update_setor()
return response_update_setor
'''@setor_router.delete("/{setor_id}", response_model=SetorSchemaOutputDeleted)
async def delete_setor(
setor_id: int,
db: Session = Depends(get_session),
current_user: User = Depends(current_active_user),
):
response_delete_setor = await SetorRepository(
setor_id=setor_id, db=db, current_user=current_user
).delete_setor()
return response_delete_setor
'''
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment