Created
December 13, 2022 15:37
-
-
Save RadoslawB/780572fa7ee424108741f9523b37b1d2 to your computer and use it in GitHub Desktop.
FastAPI logging middleware
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import json | |
import logging | |
import time | |
import traceback | |
from json import JSONDecodeError | |
from starlette.requests import Request | |
from starlette.responses import Response, StreamingResponse | |
from api.EndpointPath import EndpointPath | |
from api.Logger import Logger | |
class LoggingHttpMiddleware: | |
__excluded_paths = {'/health'} | |
def __init__(self, logger: Logger): | |
self.logger = logger | |
logging.basicConfig(format="%(message)s") | |
async def __call__(self, request: Request, call_next): | |
if self._is_excluded_path(request.url.path): | |
return await call_next(request) | |
return await self.capture_logs(call_next, request) | |
async def capture_logs(self, call_next, request) -> Response: | |
logs = await self.create_request_logs(request) | |
time_start = time.monotonic() | |
try: | |
return await self.append_response_properties(call_next, logs, request) | |
except Exception: | |
return await self.append_on_error_properties(logs) | |
finally: | |
time_end = time.monotonic() | |
logs["request_duration"] = time_end - time_start | |
await self.log(logs) | |
async def create_request_logs(self, request: Request) -> dict: | |
return { | |
"request": { | |
"user_agent": request.headers["user-agent"], | |
"params": dict(request.query_params), | |
"url_path": request.url.path, | |
"method": request.method, | |
}, | |
"response": {}, | |
"@timestamp": datetime.datetime.now().isoformat(), | |
} | |
async def log(self, log: dict) -> None: | |
pretty_json = json.dumps(log, indent=4, sort_keys=True) | |
self.logger.info(pretty_json) | |
async def get_body_from_streaming_response(self, response) -> str: | |
body = [i async for i in response.body_iterator] | |
body = body[0].decode() if len(body) == 1 else "" | |
return body | |
async def append_response_properties( | |
self, call_next, log: dict, request: Request | |
) -> Response: | |
response: StreamingResponse = await call_next(request) | |
body: str = await self.get_body_from_streaming_response(response) | |
log["response"]["body"] = self._parse_body(body) | |
log["response"]["status_code"] = response.status_code | |
headers = dict(response.headers) | |
headers["content-length"] = str(len(body.encode())) | |
return Response( | |
content=body, | |
status_code=response.status_code, | |
headers=headers, | |
media_type=response.media_type, | |
) | |
async def append_on_error_properties(self, log: dict) -> Response: | |
status_code = 500 | |
traceback_ = traceback.format_exc().split("\n") | |
log["response"]["status_code"] = status_code | |
log["traceback"] = traceback_ | |
return Response("Internal server error", status_code=status_code) | |
def _parse_body(self, body): | |
try: | |
return json.loads(body) | |
except (JSONDecodeError, TypeError): | |
return body | |
def _is_excluded_path(self, path: str): | |
return path in self.__excluded_paths |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment