Skip to content

Instantly share code, notes, and snippets.

@srinivasiyer
Created June 16, 2022 03:34
Show Gist options
  • Save srinivasiyer/a80cfad1a08bb42ec969264575c69c0e to your computer and use it in GitHub Desktop.
Save srinivasiyer/a80cfad1a08bb42ec969264575c69c0e to your computer and use it in GitHub Desktop.
Datadog Route Middleware in Python for FastAPI
from typing import Awaitable, Callable, Dict, Optional, Union
from ddtrace import tracer
from fastapi import Request, Response
from starlette.routing import Match, Scope
from constants.datadog import DatadogOperation, DatadogSpanType
class RouteTraceMiddleware:
async def handle(
self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
) -> Response:
method = request.scope.get("method")
resource = self.__get_route_resource(scope=request.scope)
if method:
resource = f"{method} {resource}"
span = tracer.trace(
name=DatadogOperation.FASTAPI_REQUEST.value,
span_type=DatadogSpanType.WEB.value,
resource=resource,
)
try:
if method:
span.set_tag("http.method", method)
url = self.__get_url_from_scope(scope=request.scope)
if url is not None:
span.set_tag("http.url", url)
query_string = self.__get_query_string_from_scope(scope=request.scope)
if query_string is not None:
span.set_tag("http.query.string", query_string)
version_tags = self.__extract_versions_from_scope(request.scope)
span.set_tags(version_tags)
headers = self.__extract_headers_from_scope(scope=request.scope)
span.set_tags(headers)
response = await call_next(request)
span.set_tag("http.status_code", response.status_code)
return response
finally:
span.finish()
@staticmethod
def __get_route_resource(scope: Scope) -> Optional[str]:
path = None
routes = scope["app"].routes
for route in routes:
match, _ = route.matches(scope)
if match == Match.FULL:
path = route.path
break
elif match == Match.PARTIAL and path is None:
path = route.path
return path
@staticmethod
def __get_url_from_scope(scope: Scope) -> Optional[str]:
url = None
server = scope.get("server")
if server and len(server) == 2:
port = server[1]
server_host = server[0] + (
":" + str(port) if port is not None and port != 80 else ""
)
full_path = scope.get("root_path", "") + scope.get("path", "")
url = scope.get("scheme", "http") + "://" + server_host + full_path
return url
@staticmethod
def __get_query_string_from_scope(scope: Scope) -> Optional[str]:
qs = scope.get("query_string")
if qs and len(qs) > 0:
query_string = qs.decode() if isinstance(qs, bytes) else qs
else:
query_string = None
return query_string
@staticmethod
def __extract_versions_from_scope(scope: Scope) -> Dict[Union[str, bytes], str]:
tags: Dict[Union[str, bytes], str] = {}
http_version = scope.get("http_version")
if http_version:
tags["http.version"] = http_version
scope_asgi = scope.get("asgi")
if scope_asgi and "version" in scope_asgi:
tags["asgi.version"] = scope_asgi["version"]
if scope_asgi and "spec_version" in scope_asgi:
tags["asgi.spec_version"] = scope_asgi["spec_version"]
return tags
@staticmethod
def __extract_headers_from_scope(scope: Scope) -> Dict[Union[str, bytes], str]:
headers = scope.get("headers")
if headers:
# headers: (Iterable[[byte string, byte string]])
return dict(
(
f"http.headers.{k.decode() if isinstance(k, bytes) else k}",
v.decode() if isinstance(v, bytes) else v,
)
for (k, v) in headers
)
return {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment