Skip to content

Instantly share code, notes, and snippets.

@justcallmelarry
Last active April 19, 2024 09:01
Show Gist options
  • Save justcallmelarry/30d9e877c5637b3fd2ae83af25e3fa41 to your computer and use it in GitHub Desktop.
Save justcallmelarry/30d9e877c5637b3fd2ae83af25e3fa41 to your computer and use it in GitHub Desktop.
FastAPI signal handler

Useful if you need to have some sort of child process stop handling something on received termination signal, rather than when the service is shutting down

import asyncio
import signal
from types import FrameType
from fastapi import FastAPI
from fastapi.responses import JSONResponse
app = FastAPI(title="example-service")
setattr(app, "_shutting_down", False)
def append_signal() -> None:
old = signal.getsignal(signal.SIGINT)
def helper(sig: int, frame: FrameType | None) -> None:
setattr(app, "_shutting_down", True)
if callable(old):
old(sig, frame)
signal.signal(signal.SIGINT, helper)
@app.on_event("startup")
def startup() -> None:
append_signal()
@app.get("/")
async def root() -> JSONResponse:
for _ in range(15):
print(getattr(app, "_shutting_down"))
await asyncio.sleep(1)
return JSONResponse({"status": "ok"})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment