Skip to content

Instantly share code, notes, and snippets.

@mrthere3
Forked from dishwad/logger.py
Created May 13, 2024 12:00
Show Gist options
  • Save mrthere3/c51969e61fbb589cd7bdba49066bfc1a to your computer and use it in GitHub Desktop.
Save mrthere3/c51969e61fbb589cd7bdba49066bfc1a to your computer and use it in GitHub Desktop.
Logging configuration for FastAPI using Gunicorn + Uvicorn workers
import logging
import sys
import time
from typing import Callable
from fastapi import Request, Response
from fastapi.routing import APIRoute
from gunicorn.glogging import Logger
from loguru import logger
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
# Has the environment variables
from app.core.settings import settings
LOGGER_FORMAT: str = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <7}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level> <level>{extra}</level>"
class InterceptHandler(logging.Handler):
def emit(self, record):
# get corresponding Loguru level if it exists
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# find caller from where originated the logged message
frame, depth = sys._getframe(6), 6
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
class GunicornLogger(Logger):
def setup(self, cfg) -> None:
handler = InterceptHandler()
# Add log handler to logger and set log level
self.error_log.addHandler(handler)
self.error_log.setLevel(settings.LOG_LEVEL)
self.access_log.addHandler(handler)
self.access_log.setLevel(settings.LOG_LEVEL)
# Configure logger before gunicorn starts logging
logger.configure(
handlers=[
{
"sink": sys.stdout,
"level": settings.LOG_LEVEL,
"format": LOGGER_FORMAT,
}
]
)
def configure_logger() -> None:
logging.root.handlers = [InterceptHandler()]
logging.root.setLevel(settings.LOG_LEVEL)
# Remove all log handlers and propagate to root logger
for name in logging.root.manager.loggerDict.keys():
# Do not remove gunicorn loggers
if "gunicorn" not in name:
logging.getLogger(name).handlers = []
logging.getLogger(name).propagate = True
# Configure sqlalchemy engine logger, which by default is set to WARNING level
logging.getLogger("sqlalchemy.engine").setLevel(settings.LOG_LEVEL)
# Configure logger (again) if gunicorn is not used
logger.configure(
handlers=[
{"sink": sys.stdout, "level": settings.LOG_LEVEL, "format": LOGGER_FORMAT}
]
)
class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
req_info: dict = {
"method": request.method,
"path": request.url.path,
"host": request.client.host,
"query_params": request.query_params,
"body": req_body.decode(errors="replace"),
}
before = time.perf_counter()
response = await original_route_handler(request)
duration = (time.perf_counter() - before) * 1000
res_info: dict = {
"status": "successful" if response.status_code < 400 else "failed",
"status_code": response.status_code,
"duration": f"{duration:0.2f}ms",
}
if isinstance(response, StreamingResponse):
res_body = b""
async for item in response.body_iterator:
res_body += item
res_info["body"] = res_body.decode(errors="replace")
if response.background:
response.background.add_task(
BackgroundTask(
self.log_request_response_info, req_info, res_info
)
)
task = response.background
else:
task = BackgroundTask(
self.log_request_response_info, req_info, res_info
)
return Response(
content=res_body,
status_code=response.status_code,
headers=dict(response.headers),
media_type=response.media_type,
background=task,
)
else:
res_body = response.body
res_info["body"] = res_body.decode(errors="replace")
if response.background:
response.background.add_task(
BackgroundTask(
self.log_request_response_info, req_info, res_info
)
)
else:
response.background = BackgroundTask(
self.log_request_response_info, req_info, res_info
)
return response
return custom_route_handler
def log_request_response_info(self, req_info: dict, res_info: dict):
url = (
f"{req_info.get('path')}?{req_info.get('query_params')}"
if req_info.get("query_params")
else req_info.get("path")
)
logger.info(
f"Request logged: {req_info.get('method')} {url}",
method=req_info.get("method"),
path=req_info.get("path"),
host=req_info.get("host"),
query_params=req_info.get("query_params"),
# body=req_info.get("body"),
)
logger.info(
f"Response logged: {req_info.get('method')} {url} {res_info.get('status_code')}",
status=res_info.get("status"),
status_code=res_info.get("status_code"),
duration=res_info.get("duration"),
# body=res_info.get("body"),
)
from uvicorn.config import LOGGING_CONFIG
# Run this function at the very top before any other FastAPI code
configure_logger()
if __name__ == "__main__":
# This code block is only executed when running the app locally by running `python main.py`.
# Locally we run the app with only Uvicorn.
# On deployment the app is run using Gunicorn with command line arguments.
# https://fastapi.tiangolo.com/deployment/server-workers/
import uvicorn
LOGGING_CONFIG.update(
{
"loggers": {
"handlers": {"default": {"class": "app.core.logging.InterceptHandler"}}
}
}
)
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
log_level=settings.LOG_LEVEL.lower(),
reload=True,
log_config=LOGGING_CONFIG,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment