Skip to content

Instantly share code, notes, and snippets.

@Lord-V15
Last active December 21, 2023 02:13
Show Gist options
  • Save Lord-V15/abfd5cb85b6db5b677bcc64e1376c1cb to your computer and use it in GitHub Desktop.
Save Lord-V15/abfd5cb85b6db5b677bcc64e1376c1cb to your computer and use it in GitHub Desktop.
Using FastAPI with Deta's Base
# all of the login functionality was handled by AWS Cognito.
# lib imports
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
from typing import Optional
from deta import Deta
import os
from dotenv import load_dotenv
# .env file loading
load_dotenv()
db_key = os.getenv("DETA_DB")
db_name = os.getenv("DETA_DB_USERS")
# data models
class NewUser(BaseModel):
email: str
key: str
name: str
country: str
# phone: Optional[str] = None
class User(BaseModel):
email: str
name: str
country: str
# phone: Optional[str] = None
# APIs (with exception handling)
router = APIRouter()
@router.get("/users/{user_id}", response_model=NewUser, tags=["database"])
async def read_user_by_id(user_id: str):
deta = Deta(db_key)
db = deta.Base(db_name)
try:
user = db.get(key=user_id)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
if user is None:
raise HTTPException(
status_code=404, detail="The user doesn't exist")
return user
@router.post("/users/", tags=["database"], status_code=status.HTTP_201_CREATED)
async def sign_up(user: NewUser):
deta = Deta(db_key)
db = deta.Base(db_name)
if db.get(key=user.key) is not None:
raise HTTPException(status_code=409, detail="User already exists")
else :
try :
db.put({
"key": user.key,
"email": user.email,
"name": user.name,
"country": user.country
})
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return "User Added Succesfully"
@router.put("/users/{user_id}", tags=["database"])
async def update_user(user_id: str, user: User):
deta = Deta(db_key)
db = deta.Base(db_name)
if db.get(key=user_id) is None:
raise HTTPException(status_code=404, detail="The user doesn't exist")
else:
try:
db.put({
"key": user_id,
"email": user.email,
"name": user.name,
"country": user.country
})
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return "User Updated Succesfully"
@router.delete("/users/{user_id}", tags=["database"])
async def delete_user(user_id: str):
deta = Deta(db_key)
db = deta.Base(db_name)
if db.get(key=user_id) is None:
raise HTTPException(status_code=404, detail="The user doesn't exist")
else:
try:
db.delete(user_id)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
return "User Deleted Succesfully"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment