Skip to content

Instantly share code, notes, and snippets.

@sdkfz181tiger
Last active December 7, 2023 12:52
Show Gist options
  • Save sdkfz181tiger/630f090a9db3fbf81f54d6184989b95d to your computer and use it in GitHub Desktop.
Save sdkfz181tiger/630f090a9db3fbf81f54d6184989b95d to your computer and use it in GitHub Desktop.
FastAPI基礎04_SQLAlchemy
# coding: utf-8
"""
Local
1, Install
$ python3 -m pip install fastapi==0.100.0 'uvicorn[standard]==0.22.0'
2, Run
$ uvicorn main:app --reload
3, Docs
http://127.0.0.1:8000/docs
4, Check
$ curl -X GET http://127.0.0.1:8000/menu
$ curl -X POST http://127.0.0.1:8000/menu -H "Content-Type: application/json" -d '{"name":"カレーランチ", "comment":"これが究極!!"}'
$ curl -X PUT http://127.0.0.1:8000/menu -H "Content-Type: application/json" -d '{"uid":"1", "name":"カレーセット", "comment":"これぞ至高!!"}'
$ curl -X DELETE http://127.0.0.1:8000/menu/1
"""
from fastapi import FastAPI
from modules import rt_crud, sc_menu
# Instance
app = FastAPI()
app.include_router(rt_crud.router)
# Routing
@app.get("/")
def hello_world():
return {"greeting": "Hello, World!!"}
# coding: utf-8
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from typing import List
import modules.sc_menu as sc
import modules.sql_alchemy as ar
# Instance
router = APIRouter()
# SQLAlchemy
ar.create_table()
#==========
# Convert (Records -> Model)
def convertRecord2Model(record):
if(record==None): return sc.MenuModel(uid=-1, name="", comment="")
return sc.MenuModel(uid=record.uid, name=record.name, comment=record.comment)
#==========
# Request
# Create
class RequestCreate(BaseModel):
name: str
comment: str
class RequestUpdate(BaseModel):
uid: int
name: str
comment: str
#==========
# CRUD
# Create
@router.post("/menu")
async def create_menu(db:Session=Depends(ar.get_db_yield), req:RequestCreate=None):
uid = ar.insert_record(db, **req.dict())
return {"uid": uid}
# Read
@router.get("/menu", response_model=List[sc.MenuModel])
async def read_menu_all(db:Session=Depends(ar.get_db_yield)):
records = ar.read_records(db)
list = []
for record in records:
list.append(convertRecord2Model(record))
return list
# Read
@router.get("/menu/{uid}", response_model=sc.MenuModel)
async def read_menu_one(db:Session=Depends(ar.get_db_yield), uid:int=-1):
record = ar.read_record(db, uid)
return convertRecord2Model(record)
# Update
@router.put("/menu")
async def update_menu(db:Session=Depends(ar.get_db_yield), req:RequestUpdate=None):
uid = ar.update_record(db, **req.dict())
return {"uid": uid}
# Delete
@router.delete("/menu/{uid}")
async def delete_menu(db:Session=Depends(ar.get_db_yield), uid:int=-1):
uid = ar.delete_record(db, uid=uid)
return {"uid": uid}
# coding: utf-8
from pydantic import BaseModel, Field
from typing import Optional
class MenuModel(BaseModel):
uid: int
name: str | None = Field(None, example="モーニングセット")
comment: str | None = Field(None, example="朝はこれで決まり!!")
# coding: utf-8
"""
1, Install
$ python3 -m pip install sqlalchemy==2.0.15
"""
import datetime, hashlib, os
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm.exc import NoResultFound
# SQLAlchemy
dir_base = os.path.dirname(__file__)
db_path = "sqlite:///" + os.path.join(dir_base, "data.sqlite")
db_table = "record"
# Engine
db_engine = create_engine(db_path, echo=True)
db_session = sessionmaker(bind=db_engine)
Base = declarative_base()
# Session
def get_db():
return db_session()
# Session(For FastAPI)
def get_db_yield():
db = get_db()
try:
yield db
finally:
db.close()
# Model
class Record(Base):
# Table
__tablename__ = db_table
# ID
uid = Column(Integer, primary_key=True, autoincrement=True)
# Name
name = Column(String(12), server_default="noname")
# Comment
comment = Column(String(24), server_default="nocomment")
# Timestamp
time_stamp = Column(String(24), server_default="1970/01/01 00:00:00")
def __init__(self, name, comment, time_stamp):
self.name = name
self.comment = comment
self.time_stamp = time_stamp
def __str__(self):
return "uid:{0}, name:{1}, comment:{2}, time_stamp:{3}".format(
self.uid, self.name, self.comment, self.time_stamp)
# CRUD(Create)
def create_table():
print("create_table:", db_table)
Base.metadata.create_all(db_engine)
def clear_table(db):
print("clear_table:", db_table)
db.execute(delete(Record))
db.commit()
# CRUD(Insert)
def insert_record(db, name, comment):
print("insert_record:", name, comment)
record = Record(name, comment, get_time())
db.add(record)
db.commit()
return record.uid
# CRUD(Read)
def read_records(db):
print("read_records!!")
stmt = select(Record).order_by(Record.uid.desc())
return db.scalars(stmt)
def read_record(db, uid):
print("read_record:", uid)
try:
stmt = select(Record).where(Record.uid == uid)
return db.scalars(stmt).one()
except NoResultFound:
return None
# CRUD(Update)
def update_record(db, uid, name, comment):
print("update_record:", uid)
record = read_record(db, uid)
if record == None: return -1
record.name = name
record.comment = comment
record.time_stamp = get_time()
db.commit()
return record.uid
# CRUD(Delete)
def delete_record(db, uid):
print("delete_record:", uid)
record = read_record(db, uid)
if record == None: return -1
db.delete(record)
db.commit()
return record.uid
# Hash
def get_hash(text):
return hashlib.md5(text.encode()).hexdigest()
# Time
def get_time():
return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment