Skip to content

Instantly share code, notes, and snippets.

@RadoslawB
Created December 13, 2022 15:37
Show Gist options
  • Save RadoslawB/780572fa7ee424108741f9523b37b1d2 to your computer and use it in GitHub Desktop.
Save RadoslawB/780572fa7ee424108741f9523b37b1d2 to your computer and use it in GitHub Desktop.
FastAPI logging middleware
import datetime
import json
import logging
import time
import traceback
from json import JSONDecodeError
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from api.EndpointPath import EndpointPath
from api.Logger import Logger
class LoggingHttpMiddleware:
__excluded_paths = {'/health'}
def __init__(self, logger: Logger):
self.logger = logger
logging.basicConfig(format="%(message)s")
async def __call__(self, request: Request, call_next):
if self._is_excluded_path(request.url.path):
return await call_next(request)
return await self.capture_logs(call_next, request)
async def capture_logs(self, call_next, request) -> Response:
logs = await self.create_request_logs(request)
time_start = time.monotonic()
try:
return await self.append_response_properties(call_next, logs, request)
except Exception:
return await self.append_on_error_properties(logs)
finally:
time_end = time.monotonic()
logs["request_duration"] = time_end - time_start
await self.log(logs)
async def create_request_logs(self, request: Request) -> dict:
return {
"request": {
"user_agent": request.headers["user-agent"],
"params": dict(request.query_params),
"url_path": request.url.path,
"method": request.method,
},
"response": {},
"@timestamp": datetime.datetime.now().isoformat(),
}
async def log(self, log: dict) -> None:
pretty_json = json.dumps(log, indent=4, sort_keys=True)
self.logger.info(pretty_json)
async def get_body_from_streaming_response(self, response) -> str:
body = [i async for i in response.body_iterator]
body = body[0].decode() if len(body) == 1 else ""
return body
async def append_response_properties(
self, call_next, log: dict, request: Request
) -> Response:
response: StreamingResponse = await call_next(request)
body: str = await self.get_body_from_streaming_response(response)
log["response"]["body"] = self._parse_body(body)
log["response"]["status_code"] = response.status_code
headers = dict(response.headers)
headers["content-length"] = str(len(body.encode()))
return Response(
content=body,
status_code=response.status_code,
headers=headers,
media_type=response.media_type,
)
async def append_on_error_properties(self, log: dict) -> Response:
status_code = 500
traceback_ = traceback.format_exc().split("\n")
log["response"]["status_code"] = status_code
log["traceback"] = traceback_
return Response("Internal server error", status_code=status_code)
def _parse_body(self, body):
try:
return json.loads(body)
except (JSONDecodeError, TypeError):
return body
def _is_excluded_path(self, path: str):
return path in self.__excluded_paths
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment