from typing import List from passlib.context import CryptContext from fastapi import Depends, APIRouter, HTTPException from sqlalchemy.orm import Session from . import models from starship.database import engine, get_db from starship.schemas import Starship, StarshipResponse, UserResponse, User app = FastAPI( title="USERS API", description="All CRUD operations for Users" ) models.Base.metadata.create_all(engine) pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @app.post('/user',tags=['users']) def create_user( request: User, db: Session = Depends(get_db)): user= models.User( id=request.id, email=request.email, password= pwd_context.hash(request.password) ) db.add(user) db.commit() db.refresh(user) return user @app.get('/user/{id}',tags=['users']) def get_user(id: str, db: Session = Depends(get_db)): return db.query(models.User).filter(models.User.id == id).first() @app.delete('/user/{id}', tags=['users']) def delete_user( id: int, db: Session = Depends(get_db)): db.query(models.User).filter(models.User.id == id).delete() db.commit() return "User deleted successfully" @app.get('/users', response_model=List[UserResponse], tags=['users']) def get_users(db: Session = Depends(get_db)): return db.query(models.User).all() @app.put('/user/{id}', tags=['users']) def update_user(id: str, request: User, db: Session = Depends(get_db)): user = db.query(models.User).filter(models.User.id == id) if not user: raise HTTPException(status_code=404) user.update(request.dict()) db.commit() return "User updated successfully"