Useful if you need to have some sort of child process stop handling something on received termination signal, rather than when the service is shutting down
Last active
April 19, 2024 09:01
-
-
Save justcallmelarry/30d9e877c5637b3fd2ae83af25e3fa41 to your computer and use it in GitHub Desktop.
FastAPI signal handler
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import signal | |
from types import FrameType | |
from fastapi import FastAPI | |
from fastapi.responses import JSONResponse | |
app = FastAPI(title="example-service") | |
setattr(app, "_shutting_down", False) | |
def append_signal() -> None: | |
old = signal.getsignal(signal.SIGINT) | |
def helper(sig: int, frame: FrameType | None) -> None: | |
setattr(app, "_shutting_down", True) | |
if callable(old): | |
old(sig, frame) | |
signal.signal(signal.SIGINT, helper) | |
@app.on_event("startup") | |
def startup() -> None: | |
append_signal() | |
@app.get("/") | |
async def root() -> JSONResponse: | |
for _ in range(15): | |
print(getattr(app, "_shutting_down")) | |
await asyncio.sleep(1) | |
return JSONResponse({"status": "ok"}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment