Skip to content

Instantly share code, notes, and snippets.

@horodchukanton
Last active February 10, 2023 08:41
Show Gist options
  • Save horodchukanton/9ca868de0788a7dd8f6162d27fd6e873 to your computer and use it in GitHub Desktop.
Save horodchukanton/9ca868de0788a7dd8f6162d27fd6e873 to your computer and use it in GitHub Desktop.
FastAPI generic CRUD router
import codecs
import csv
import logging
from typing import List, Type, Union, Dict
from fastapi import (
HTTPException, APIRouter, Request, Response, File,
UploadFile, )
from fastapi.responses import StreamingResponse
from pydantic import ValidationError, BaseModel
from sqlalchemy.exc import (NoResultFound, IntegrityError)
from application.core.persistence import Base
from application.core.queue.controller import SimpleTasksController
from application.core.queue.storage import MissingEntryRequestedException
from application.core.schemas.bulk_update import BulkUpdateValues
from application.core.schemas.diff import GenericDifferenceResponse
from application.core.schemas.import_result import ImportResponse
from application.core.schemas.options_bulk_request import BulkOptionsRequest
from application.core.schemas.queued_task import (
OperationStatusResponse,
OperationQueuedResponse, )
class AbstractCrudRouter(APIRouter):
model: Base = None
result_schema: Type[BaseModel] = None
search_schema: Type[BaseModel] = None
update_schema: Type[BaseModel] = None
diff_schema = GenericDifferenceResponse
crud = None
tasks_controller = None
prefix: str = None
list_path: str = None
get_path: str = None
create_path: str = None
update_path: str = None
delete_path: str = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger("crud_" + self.get_entity_name())
self.tasks_controller = SimpleTasksController()
self._build_router()
def _build_router(self):
super().add_api_route(
name="get_" + self.get_entity_name() + "_list",
path=self.list_path, methods=['get'],
response_model=List[self.result_schema],
operation_id="list" + self.get_entity_name(),
endpoint=self.get_entities_list)
super().add_api_route(
name="read_" + self.get_entity_name(),
path=self.get_path, methods=['get'],
response_model=self.result_schema,
operation_id="get" + self.get_entity_name(),
endpoint=self.get_entity)
super().add_api_route(
name="update_" + self.get_entity_name(),
path=self.update_path, methods=['put'],
response_model=self.result_schema,
operation_id="update" + self.get_entity_name(),
endpoint=self.update_entity)
super().add_api_route(
name="bulk_update_" + self.get_entity_name(),
path=self.list_path + '/bulk',
methods=['put'],
operation_id="bulk_update" + self.get_entity_name(),
endpoint=self.update_entity_bulk)
super().add_api_route(
name="create_" + self.get_entity_name(),
path=self.create_path, methods=['post'],
response_model=self.result_schema,
operation_id="create" + self.get_entity_name(),
endpoint=self.create_entity)
super().add_api_route(
name="delete_" + self.get_entity_name(),
path=self.delete_path, methods=['delete'],
operation_id="delete" + self.get_entity_name(),
endpoint=self.delete_entity)
super().add_api_route(
name="bulk_delete_" + self.get_entity_name(),
path=self.list_path + '/bulk', methods=['delete'],
operation_id="bulk_delete" + self.get_entity_name(),
endpoint=self.delete_entity_bulk)
def get_entity_name(self):
return self.result_schema.__name__
def get_entities_list(self, request: Request, response: Response,
offset: int = 0, limit: int = 100,
order_by: str = None, order: str = "asc",
):
# Extract model fields from query
search_fields = dict()
for param in request.query_params:
if param in self.result_schema.schema()['properties']:
search_fields[param] = request.query_params[param]
count = self.crud.count(**search_fields)
response.headers["x-total-count"] = str(count)
response.headers["access-control-expose-headers"] = "x-total-count"
result = self.crud.list(offset=offset, limit=limit,
order_by=order_by,
order=order, **search_fields)
return result
def get_entity(self, **kwargs):
try:
result = self.crud.read(self.search_schema(**kwargs))
except ValidationError as ve:
raise HTTPException(status_code=422,
detail=f"Invalid values: {ve}") from ve
except NoResultFound as nrf:
raise HTTPException(status_code=404,
detail="Entry is not found") from nrf
return result
def update_entity(self, update_values: update_schema, **kwargs):
try:
validated = self.search_schema(**kwargs)
search_object = self.search_schema(**validated.dict())
return self.crud.update(search_object, update_values)
except ValidationError as ve:
raise HTTPException(status_code=422,
detail=f"Invalid values: {ve}") from ve
def update_entity_bulk(self, bulk_request_body: BulkUpdateValues):
try:
item_list = [self.search_schema.parse_obj(o)
for o in bulk_request_body.entities]
update_values = self.update_schema.parse_obj(
bulk_request_body.update)
return self.crud.update_bulk(item_list, update_values)
except ValidationError as ve:
raise HTTPException(status_code=422,
detail=f"Invalid values: {ve}") from ve
def delete_entity(self, **kwargs):
try:
validated = self.search_schema(**kwargs)
search_object = self.search_schema(**validated.dict())
self.crud.delete(search_object)
except ValidationError as ve:
raise HTTPException(status_code=422,
detail=f"Invalid values: {ve}") from ve
except NoResultFound as nrf:
raise HTTPException(status_code=404,
detail="Entry is not found") from nrf
return {"detail": "ok"}
def delete_entity_bulk(self, item_list: List):
try:
validated = [self.search_schema(**i) for i in item_list]
self.crud.delete_bulk(validated)
except ValidationError as ve:
raise HTTPException(status_code=422,
detail=f"Invalid values: {ve}") from ve
return {"detail": "ok"}
def create_entity(self, json_body: Union[result_schema]):
try:
return self.crud.create(**json_body.dict())
except IntegrityError as e:
raise HTTPException(
status_code=422,
detail="Unique or another DB constraint failed, either"
" record for Provider and Provider Service"
" already exists or such combination"
" of Provider and Provider Service is not acceptable: "
"" + str(e)) from e
def _model_to_schema(self, model_object: BaseModel):
keys = self.crud.table_columns()
values = {c: getattr(model_object, c) for c in keys}
return self.result_schema(**values)
def _model_to_dict(self, model_object: BaseModel):
return dict(self._model_to_schema(model_object))
@horodchukanton
Copy link
Author

This code is incomplete and will not work without concrete crud and few other packages, but it gives and understanding on how to reuse routes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment