Skip to content

Instantly share code, notes, and snippets.

@WilliamStam
Created December 14, 2023 12:56
Show Gist options
  • Save WilliamStam/963dce6f4416d43c0f2efd5372838fc2 to your computer and use it in GitHub Desktop.
Save WilliamStam/963dce6f4416d43c0f2efd5372838fc2 to your computer and use it in GitHub Desktop.
Fastapi event system
import dataclasses
import logging
from typing import Annotated, Callable, Dict, List
from fastapi import Depends, FastAPI
from fastapi.dependencies.models import Dependant
from fastapi.dependencies.utils import get_dependant, solve_dependencies
from starlette.requests import Request
from utilities.events import EventsCollection
logger = logging.getLogger(__name__)
class EventsCollection():
def __init__(self):
self.events: Dict[str, List[Callable]] = {}
def register(self, event, listener):
event_name = f"{event.__module__}.{event.__name__}"
logger.warning(f"registering event [{event_name}] with listener [{str(listener)}]")
self.events.setdefault(event_name, [])
self.events[event_name].append(listener)
async def get_events(self, event):
event_name = f"{event.__module__}.{event.__class__.__name__}"
logger.warning(f"getting listeners for event [{event_name}]")
listeners = self.events.get(event_name, [])
return listeners
async def dispatch(self, event, request, **kwargs) -> None:
event_name = f"{event.__module__}.{event.__class__.__name__}"
logger.warning(f"dispatching event [{event_name}]")
listeners = self.events.get(event_name, None)
if listeners:
for listener in listeners:
logger.warning(f"found listener [{listener}]")
solved_result = await solve_dependencies(
request=request,
dependant=get_dependant(call=listener, path=request.url.path)
)
(
sub_values,
sub_errors,
background_tasks,
_, # the subdependency returns the same response we have
sub_dependency_cache,
) = solved_result
for key in sub_values.keys():
if key in kwargs:
sub_values[key] = kwargs[key]
logger.warning(kwargs)
logger.warning(sub_values)
await listener(event, **sub_values)
events = EventsCollection()
#
# Events = Annotated[EventsCollection, Depends(events)]
@dataclasses.dataclass
class BasicEvent:
bird: str
owl: str
# want this to be looked up in the listener for the event
@dataclasses.dataclass
class ListenerDependency:
dog: str
cat: str
# defining the listeners dependency
async def basic_dependency():
return ListenerDependency(
dog="woof",
cat="meow"
)
BasicDependency = Annotated[ListenerDependency, Depends(basic_dependency)]
# the listener
async def listener1(event):
print("LISTENER1 CALLED")
print(event)
async def listener2(event, dep: BasicDependency):
print("LISTENER2 CALLED")
print(event)
print(dep)
# can take any fastapi dependency
from dependencies.user import CurrentUser, get_token
async def listener3(event, user: CurrentUser, token = Depends(get_token)):
print("LISTENER3 CALLED")
print(event)
print(user)
print(token)
events.register(BasicEvent, listener1)
events.register(BasicEvent, listener2)
events.register(BasicEvent, listener3)
app = FastAPI()
class EventDependency:
def __init__(self, request: Request):
self.request = request
async def __call__(self, *args, **kwargs):
logger.debug("EventDependency __call__")
return events
async def dispatch(self, ev, **kwargs):
logger.debug("EventDependency dispatch")
await events.dispatch(ev, request=self.request, **kwargs)
Events = Annotated[EventDependency, Depends(EventDependency)]
@app.get("/")
async def testing(events: Events) -> str:
event = BasicEvent(
bird="squeak",
owl="hiss"
)
await events.dispatch(event)
# await event_dispatcher(event)
return "woof"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment