-
-
Save smizell/3686a575de4f7e30c2616c557b34c385 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pickle | |
import os | |
import uuid | |
from dataclasses import dataclass, field | |
from datetime import datetime | |
from enum import Enum | |
from pathlib import Path | |
from typing import Generic, List, Optional, TypeVar, Union | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import JSONResponse | |
from fastapi.responses import HTMLResponse | |
from pydantic import BaseModel, ValidationError | |
from pydantic.generics import GenericModel | |
STORAGE_FILE = "./storage.pickle" | |
# Finite State Machine (FSM) code | |
# --------------------------- | |
class TransitionError(Exception): | |
def __init__(self, message): | |
self.message = message | |
T = TypeVar("T") | |
U = TypeVar("U") | |
@dataclass | |
class Transition(Generic[T, U]): | |
action: T | |
from_state: U | |
to_state: U | |
@dataclass | |
class FSM(Generic[T, U]): | |
initial: T | |
states: List[T] | |
transitions: List[U] | |
def transitions(state: Enum, fsm: FSM) -> List[Transition]: | |
return [t for t in fsm.transitions if t.from_state == state] | |
def trigger(action: Enum, current_state: Enum, fsm: FSM) -> Enum: | |
ts = transitions(current_state, fsm) | |
next_action = next((t for t in ts if t.action == action), None) | |
if not next_action: | |
raise TransitionError( | |
f"Can't trigger {action.value} for state {current_state.value}" | |
) | |
return next_action.to_state | |
# Core models and logic | |
# --------------------------- | |
class BookmarkState(Enum): | |
QUEUED = "queued" | |
READ = "read" | |
ARCHIVED = "archived" | |
class BookmarkAction(Enum): | |
READ = "read" | |
ARCHIVE = "archive" | |
REREAD = "reread" | |
QUEUE = "queue" | |
class BookmarkTransition(Transition[BookmarkAction, BookmarkState]): | |
pass | |
class BookmarkFSM(FSM[BookmarkState, BookmarkTransition]): | |
pass | |
bookmark_fsm = BookmarkFSM( | |
initial=BookmarkState.QUEUED, | |
states=list(BookmarkState), | |
transitions=[ | |
BookmarkTransition( | |
action=BookmarkAction.READ, | |
from_state=BookmarkState.QUEUED, | |
to_state=BookmarkState.READ, | |
), | |
BookmarkTransition( | |
action=BookmarkAction.ARCHIVE, | |
from_state=BookmarkState.QUEUED, | |
to_state=BookmarkState.ARCHIVED, | |
), | |
BookmarkTransition( | |
action=BookmarkAction.QUEUE, | |
from_state=BookmarkState.ARCHIVED, | |
to_state=BookmarkState.QUEUED, | |
), | |
BookmarkTransition( | |
action=BookmarkAction.REREAD, | |
from_state=BookmarkState.READ, | |
to_state=BookmarkState.QUEUED, | |
), | |
], | |
) | |
class Bookmark(BaseModel): | |
bookmark_url: str | |
accessed: Optional[datetime] = None | |
title: str | |
description: str | |
read_status: BookmarkState = BookmarkState.QUEUED | |
# Storage Layer | |
# --------------------------- | |
@dataclass | |
class Record(Generic[T]): | |
id: uuid.UUID | |
data: T | |
@dataclass | |
class Storage: | |
records: Optional[List[Record]] = None | |
@classmethod | |
def from_file(cls, file_name): | |
if os.path.exists(file_name): | |
with open(file_name, "rb") as f: | |
return cls(records=pickle.load(f)) | |
return cls(records=[]) | |
def save(self, item): | |
r = Record(data=item, id=uuid.uuid4()) | |
self.records.append(r) | |
with open(STORAGE_FILE, "wb") as f: | |
pickle.dump(self.records, f) | |
return r | |
def filter_by_model(self, model_cls): | |
return (record for record in self.records if isinstance(record.data, model_cls)) | |
def find_by_id(self, model_cls, inst_id): | |
model_records = self.filter_by_model(model_cls) | |
match = (record for record in model_records if record.id == inst_id) | |
return next(match, None) | |
def update(self, model_cls, inst_id, **kwargs): | |
record = self.find_by_id(model_cls, inst_id) | |
record.data = record.data.copy(update=kwargs) | |
return record | |
action_to_url = { | |
"read": "read_url", | |
"archive": "archive_url", | |
"reread": "reread_url", | |
} | |
def bookmark_item_from_record(app: FastAPI, bookmark_record: Record[Bookmark]): | |
url = app.url_path_for("bookmark_item", bookmark_id=str(bookmark_record.id)) | |
data = bookmark_record.data.dict() | |
bookmark_item = BookmarkItem(url=url, data=data) | |
ts = transitions(bookmark_record.data.read_status, bookmark_fsm) | |
for t in ts: | |
setattr( | |
bookmark_item, | |
action_to_url[t.action.value], | |
app.url_path_for( | |
"update_bookmark_status", | |
bookmark_id=str(bookmark_record.id), | |
action=t.action.value, | |
), | |
) | |
return bookmark_item | |
def bookmark_collection_from_records(app: FastAPI, records: List[Record[Bookmark]]): | |
url = app.url_path_for("bookmark_collection") | |
bookmark_items = [bookmark_item_from_record(app, record) for record in records] | |
return BookmarkCollection(url=url, items=bookmark_items) | |
# Representors (Collection and Item) | |
# --------------------------- | |
class Item(GenericModel, Generic[T]): | |
url: str | |
data: T | |
class Collection(GenericModel, Generic[T]): | |
url: str | |
items: List[T] | |
# Bookmark resources | |
# --------------------------- | |
class BookmarkItem(Item[Bookmark]): | |
read_url: Optional[str] = None | |
archive_url: Optional[str] = None | |
reread_url: Optional[str] = None | |
class BookmarkCollection(Collection[BookmarkItem]): | |
pass | |
# HTTP API | |
# --------------------------- | |
app = FastAPI(title="Bookmark API") | |
storage = Storage.from_file(STORAGE_FILE) | |
@app.get("/", include_in_schema=False) | |
def root(): | |
return HTMLResponse(Path("index.html").read_text()) | |
@app.get( | |
"/bookmarks", | |
response_model=BookmarkCollection, | |
tags=["Bookmark"], | |
response_model_exclude_unset=True, | |
) | |
def bookmark_collection(): | |
book_records = storage.filter_by_model(Bookmark) | |
return bookmark_collection_from_records(app, book_records) | |
@app.post( | |
"/bookmarks", | |
response_model=BookmarkItem, | |
status_code=201, | |
tags=["Bookmark"], | |
response_model_exclude_unset=True, | |
) | |
def create_bookmark(bookmark: Bookmark): | |
bookmark_record = storage.save(bookmark) | |
return bookmark_item_from_record(app, bookmark_record) | |
@app.get( | |
"/bookmarks/{bookmark_id}", | |
response_model=BookmarkItem, | |
tags=["Bookmark"], | |
response_model_exclude_unset=True, | |
) | |
def bookmark_item(bookmark_id: uuid.UUID): | |
bookmark_record = storage.find_by_id(Bookmark, bookmark_id) | |
return bookmark_item_from_record(app, bookmark_record) | |
@app.exception_handler(TransitionError) | |
async def fsm_exception_handler(request: Request, exc: TransitionError): | |
return JSONResponse(status_code=400, content={"detail": exc.message}) | |
@app.post( | |
"/bookmarks/{bookmark_id}/{action}", | |
response_model=BookmarkItem, | |
tags=["Bookmark"], | |
response_model_exclude_unset=True, | |
) | |
def update_bookmark_status(bookmark_id: uuid.UUID, action: BookmarkAction): | |
bookmark_record = storage.find_by_id(Bookmark, bookmark_id) | |
next_state = trigger(action, bookmark_record.data.read_status, bookmark_fsm) | |
new_bookmark_record = storage.update(Bookmark, bookmark_id, read_status=next_state) | |
return bookmark_item_from_record(app, new_bookmark_record) | |
def graph_fsm(): | |
from graphviz import Digraph # type: ignore | |
g = Digraph(format="png") | |
g = Digraph( | |
name="bookmark-fsm", | |
node_attr={"shape": "oval"}, | |
graph_attr={"nodesep": "1.5", "label": "Bookmark FSM"}, | |
) | |
g.node("start", shape="point") | |
g.edge("start", bookmark_fsm.initial.value) | |
for t in bookmark_fsm.transitions: | |
g.edge(t.from_state.value, t.to_state.value, label=t.action.value) | |
g.render("bookmark-fsm", format="png") | |
if __name__ == "__main__": | |
graph_fsm() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment