Created
June 16, 2022 03:34
-
-
Save srinivasiyer/a80cfad1a08bb42ec969264575c69c0e to your computer and use it in GitHub Desktop.
Datadog Route Middleware in Python for FastAPI
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Awaitable, Callable, Dict, Optional, Union | |
from ddtrace import tracer | |
from fastapi import Request, Response | |
from starlette.routing import Match, Scope | |
from constants.datadog import DatadogOperation, DatadogSpanType | |
class RouteTraceMiddleware: | |
async def handle( | |
self, request: Request, call_next: Callable[[Request], Awaitable[Response]] | |
) -> Response: | |
method = request.scope.get("method") | |
resource = self.__get_route_resource(scope=request.scope) | |
if method: | |
resource = f"{method} {resource}" | |
span = tracer.trace( | |
name=DatadogOperation.FASTAPI_REQUEST.value, | |
span_type=DatadogSpanType.WEB.value, | |
resource=resource, | |
) | |
try: | |
if method: | |
span.set_tag("http.method", method) | |
url = self.__get_url_from_scope(scope=request.scope) | |
if url is not None: | |
span.set_tag("http.url", url) | |
query_string = self.__get_query_string_from_scope(scope=request.scope) | |
if query_string is not None: | |
span.set_tag("http.query.string", query_string) | |
version_tags = self.__extract_versions_from_scope(request.scope) | |
span.set_tags(version_tags) | |
headers = self.__extract_headers_from_scope(scope=request.scope) | |
span.set_tags(headers) | |
response = await call_next(request) | |
span.set_tag("http.status_code", response.status_code) | |
return response | |
finally: | |
span.finish() | |
@staticmethod | |
def __get_route_resource(scope: Scope) -> Optional[str]: | |
path = None | |
routes = scope["app"].routes | |
for route in routes: | |
match, _ = route.matches(scope) | |
if match == Match.FULL: | |
path = route.path | |
break | |
elif match == Match.PARTIAL and path is None: | |
path = route.path | |
return path | |
@staticmethod | |
def __get_url_from_scope(scope: Scope) -> Optional[str]: | |
url = None | |
server = scope.get("server") | |
if server and len(server) == 2: | |
port = server[1] | |
server_host = server[0] + ( | |
":" + str(port) if port is not None and port != 80 else "" | |
) | |
full_path = scope.get("root_path", "") + scope.get("path", "") | |
url = scope.get("scheme", "http") + "://" + server_host + full_path | |
return url | |
@staticmethod | |
def __get_query_string_from_scope(scope: Scope) -> Optional[str]: | |
qs = scope.get("query_string") | |
if qs and len(qs) > 0: | |
query_string = qs.decode() if isinstance(qs, bytes) else qs | |
else: | |
query_string = None | |
return query_string | |
@staticmethod | |
def __extract_versions_from_scope(scope: Scope) -> Dict[Union[str, bytes], str]: | |
tags: Dict[Union[str, bytes], str] = {} | |
http_version = scope.get("http_version") | |
if http_version: | |
tags["http.version"] = http_version | |
scope_asgi = scope.get("asgi") | |
if scope_asgi and "version" in scope_asgi: | |
tags["asgi.version"] = scope_asgi["version"] | |
if scope_asgi and "spec_version" in scope_asgi: | |
tags["asgi.spec_version"] = scope_asgi["spec_version"] | |
return tags | |
@staticmethod | |
def __extract_headers_from_scope(scope: Scope) -> Dict[Union[str, bytes], str]: | |
headers = scope.get("headers") | |
if headers: | |
# headers: (Iterable[[byte string, byte string]]) | |
return dict( | |
( | |
f"http.headers.{k.decode() if isinstance(k, bytes) else k}", | |
v.decode() if isinstance(v, bytes) else v, | |
) | |
for (k, v) in headers | |
) | |
return {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment