Last active
December 7, 2023 12:52
-
-
Save sdkfz181tiger/630f090a9db3fbf81f54d6184989b95d to your computer and use it in GitHub Desktop.
FastAPI基礎04_SQLAlchemy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
Local | |
1, Install | |
$ python3 -m pip install fastapi==0.100.0 'uvicorn[standard]==0.22.0' | |
2, Run | |
$ uvicorn main:app --reload | |
3, Docs | |
http://127.0.0.1:8000/docs | |
4, Check | |
$ curl -X GET http://127.0.0.1:8000/menu | |
$ curl -X POST http://127.0.0.1:8000/menu -H "Content-Type: application/json" -d '{"name":"カレーランチ", "comment":"これが究極!!"}' | |
$ curl -X PUT http://127.0.0.1:8000/menu -H "Content-Type: application/json" -d '{"uid":"1", "name":"カレーセット", "comment":"これぞ至高!!"}' | |
$ curl -X DELETE http://127.0.0.1:8000/menu/1 | |
""" | |
from fastapi import FastAPI | |
from modules import rt_crud, sc_menu | |
# Instance | |
app = FastAPI() | |
app.include_router(rt_crud.router) | |
# Routing | |
@app.get("/") | |
def hello_world(): | |
return {"greeting": "Hello, World!!"} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
from fastapi import APIRouter, Depends | |
from pydantic import BaseModel, Field | |
from sqlalchemy.orm import Session | |
from typing import List | |
import modules.sc_menu as sc | |
import modules.sql_alchemy as ar | |
# Instance | |
router = APIRouter() | |
# SQLAlchemy | |
ar.create_table() | |
#========== | |
# Convert (Records -> Model) | |
def convertRecord2Model(record): | |
if(record==None): return sc.MenuModel(uid=-1, name="", comment="") | |
return sc.MenuModel(uid=record.uid, name=record.name, comment=record.comment) | |
#========== | |
# Request | |
# Create | |
class RequestCreate(BaseModel): | |
name: str | |
comment: str | |
class RequestUpdate(BaseModel): | |
uid: int | |
name: str | |
comment: str | |
#========== | |
# CRUD | |
# Create | |
@router.post("/menu") | |
async def create_menu(db:Session=Depends(ar.get_db_yield), req:RequestCreate=None): | |
uid = ar.insert_record(db, **req.dict()) | |
return {"uid": uid} | |
# Read | |
@router.get("/menu", response_model=List[sc.MenuModel]) | |
async def read_menu_all(db:Session=Depends(ar.get_db_yield)): | |
records = ar.read_records(db) | |
list = [] | |
for record in records: | |
list.append(convertRecord2Model(record)) | |
return list | |
# Read | |
@router.get("/menu/{uid}", response_model=sc.MenuModel) | |
async def read_menu_one(db:Session=Depends(ar.get_db_yield), uid:int=-1): | |
record = ar.read_record(db, uid) | |
return convertRecord2Model(record) | |
# Update | |
@router.put("/menu") | |
async def update_menu(db:Session=Depends(ar.get_db_yield), req:RequestUpdate=None): | |
uid = ar.update_record(db, **req.dict()) | |
return {"uid": uid} | |
# Delete | |
@router.delete("/menu/{uid}") | |
async def delete_menu(db:Session=Depends(ar.get_db_yield), uid:int=-1): | |
uid = ar.delete_record(db, uid=uid) | |
return {"uid": uid} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
""" | |
1, Install | |
$ python3 -m pip install sqlalchemy==2.0.15 | |
""" | |
import datetime, hashlib, os | |
from sqlalchemy import * | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import Session, sessionmaker | |
from sqlalchemy.orm.exc import NoResultFound | |
# SQLAlchemy | |
dir_base = os.path.dirname(__file__) | |
db_path = "sqlite:///" + os.path.join(dir_base, "data.sqlite") | |
db_table = "record" | |
# Engine | |
db_engine = create_engine(db_path, echo=True) | |
db_session = sessionmaker(bind=db_engine) | |
Base = declarative_base() | |
# Session | |
def get_db(): | |
return db_session() | |
# Session(For FastAPI) | |
def get_db_yield(): | |
db = get_db() | |
try: | |
yield db | |
finally: | |
db.close() | |
# Model | |
class Record(Base): | |
# Table | |
__tablename__ = db_table | |
# ID | |
uid = Column(Integer, primary_key=True, autoincrement=True) | |
# Name | |
name = Column(String(12), server_default="noname") | |
# Comment | |
comment = Column(String(24), server_default="nocomment") | |
# Timestamp | |
time_stamp = Column(String(24), server_default="1970/01/01 00:00:00") | |
def __init__(self, name, comment, time_stamp): | |
self.name = name | |
self.comment = comment | |
self.time_stamp = time_stamp | |
def __str__(self): | |
return "uid:{0}, name:{1}, comment:{2}, time_stamp:{3}".format( | |
self.uid, self.name, self.comment, self.time_stamp) | |
# CRUD(Create) | |
def create_table(): | |
print("create_table:", db_table) | |
Base.metadata.create_all(db_engine) | |
def clear_table(db): | |
print("clear_table:", db_table) | |
db.execute(delete(Record)) | |
db.commit() | |
# CRUD(Insert) | |
def insert_record(db, name, comment): | |
print("insert_record:", name, comment) | |
record = Record(name, comment, get_time()) | |
db.add(record) | |
db.commit() | |
return record.uid | |
# CRUD(Read) | |
def read_records(db): | |
print("read_records!!") | |
stmt = select(Record).order_by(Record.uid.desc()) | |
return db.scalars(stmt) | |
def read_record(db, uid): | |
print("read_record:", uid) | |
try: | |
stmt = select(Record).where(Record.uid == uid) | |
return db.scalars(stmt).one() | |
except NoResultFound: | |
return None | |
# CRUD(Update) | |
def update_record(db, uid, name, comment): | |
print("update_record:", uid) | |
record = read_record(db, uid) | |
if record == None: return -1 | |
record.name = name | |
record.comment = comment | |
record.time_stamp = get_time() | |
db.commit() | |
return record.uid | |
# CRUD(Delete) | |
def delete_record(db, uid): | |
print("delete_record:", uid) | |
record = read_record(db, uid) | |
if record == None: return -1 | |
db.delete(record) | |
db.commit() | |
return record.uid | |
# Hash | |
def get_hash(text): | |
return hashlib.md5(text.encode()).hexdigest() | |
# Time | |
def get_time(): | |
return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment