Skip to content

Instantly share code, notes, and snippets.

@phillipuniverse
Last active December 4, 2022 15:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phillipuniverse/f666a2bf8e74d7b2f409ac2990df6762 to your computer and use it in GitHub Desktop.
Save phillipuniverse/f666a2bf8e74d7b2f409ac2990df6762 to your computer and use it in GitHub Desktop.
OpenTelemetry bridge to Datadog Profiler

Supporting code for some exploration on DataDog/dd-trace-py#3819.

Attempted in the context of a Django app. The settings.py file corresponds to the main Django settings.py, but code should be able to be extracted out to other library entrypoints.

import opentelemetry.trace as trace_api
from ddtrace.constants import ENV_KEY, SAMPLE_RATE_METRIC_KEY, VERSION_KEY
from ddtrace.ext import SpanTypes as DatadogSpanTypes
from ddtrace.span import Span as DatadogSpan
from opentelemetry.sdk.resources import DEPLOYMENT_ENVIRONMENT, SERVICE_VERSION, Resource
from opentelemetry.sdk.trace import ReadableSpan, sampling
from opentelemetry.semconv.trace import SpanAttributes
from shipwell_common_python.tracing.contrib.datadog.constants import (
DD_ERROR_MSG_TAG_KEY,
DD_ERROR_STACK_TAG_KEY,
DD_ERROR_TYPE_TAG_KEY,
DD_ORIGIN,
EVENT_NAME_EXCEPTION,
EXCEPTION_MSG_ATTR_KEY,
EXCEPTION_STACK_ATTR_KEY,
EXCEPTION_TYPE_ATTR_KEY,
SERVICE_NAME_TAG,
)
_INSTRUMENTATION_SPAN_TYPES = {
"opentelemetry.instrumentation.aiohttp-client": DatadogSpanTypes.HTTP,
"opentelemetry.instrumentation.asgi": DatadogSpanTypes.WEB,
"opentelemetry.instrumentation.dbapi": DatadogSpanTypes.SQL,
"opentelemetry.instrumentation.django": DatadogSpanTypes.WEB,
"opentelemetry.instrumentation.flask": DatadogSpanTypes.WEB,
"opentelemetry.instrumentation.grpc": DatadogSpanTypes.GRPC,
"opentelemetry.instrumentation.jinja2": DatadogSpanTypes.TEMPLATE,
"opentelemetry.instrumentation.mysql": DatadogSpanTypes.SQL,
"opentelemetry.instrumentation.psycopg2": DatadogSpanTypes.SQL,
"opentelemetry.instrumentation.pymemcache": DatadogSpanTypes.CACHE,
"opentelemetry.instrumentation.pymongo": DatadogSpanTypes.MONGODB,
"opentelemetry.instrumentation.pymysql": DatadogSpanTypes.SQL,
"opentelemetry.instrumentation.redis": DatadogSpanTypes.REDIS,
"opentelemetry.instrumentation.requests": DatadogSpanTypes.HTTP,
"opentelemetry.instrumentation.sqlalchemy": DatadogSpanTypes.SQL,
"opentelemetry.instrumentation.wsgi": DatadogSpanTypes.WEB,
}
"""
IMPORTANT IMPLEMENTATION NOTE ON THIS FILE
This is all cobbled together from an existing OpenTelemetry -> Datadog bridge used in the now-removed
Datadog Exporter/Span Processor that lived in OpenTelemetry.
The jumping off point was from https://github.com/open-telemetry/opentelemetry-python-contrib/blob/93b8398b6068728f999247ad53275d487374f116/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py#L122-L193
but additional code was borrowed from other areas in this otel package at https://github.com/open-telemetry/opentelemetry-python-contrib/tree/93b8398b6068728f999247ad53275d487374f116/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog
The alternative would be to translate as little as possible (e.g. just the span id and trace id or whatever)
for this to correctly link to Datadog profile results but I see this as somewhat future proofing just in case
"""
def translate_otel_to_datadog(span: ReadableSpan) -> DatadogSpan:
"""
Takes an OpenTelemetry span and converts it to a Datadog span for use in the profiler.
This is all copy/pasted and cobbled together from prior art from DD contributions to OpenTelemetry
(which has since been removed) at https://github.com/open-telemetry/opentelemetry-python-contrib/blob/93b8398b6068728f999247ad53275d487374f116/exporter/opentelemetry-exporter-datadog/src/opentelemetry/exporter/datadog/exporter.py#L122-L193
"""
trace_id, parent_id, span_id = _get_trace_ids(span)
# extract resource attributes to be used as tags as well as potential service name
[
resource_tags,
resource_service_name,
] = _extract_tags_from_resource(span.resource)
datadog_span = DatadogSpan(
_get_span_name(span),
service=resource_service_name,
resource=_get_resource(span),
span_type=_get_span_type(span),
trace_id=trace_id,
span_id=span_id,
parent_id=parent_id,
)
datadog_span.start_ns = span.start_time
if span.end_time:
datadog_span.duration_ns = span.end_time - span.start_time
if not span.status.is_ok:
datadog_span.error = 1
# loop over events and look for exception events, extract info.
# https://github.com/open-telemetry/opentelemetry-python/blob/71e3a7a192c0fc8a7503fac967ada36a74b79e58/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py#L810-L819
if span.events:
_extract_tags_from_exception_events(span.events, datadog_span)
# combine resource attributes and span attributes, don't modify existing span attributes
combined_span_tags = {}
combined_span_tags.update(resource_tags)
combined_span_tags.update(span.attributes)
datadog_span.set_tags(combined_span_tags)
# add configured env tag
datadog_span.set_tag(ENV_KEY, span.resource.attributes[DEPLOYMENT_ENVIRONMENT])
# add configured application version tag to only root span
if parent_id == 0:
datadog_span.set_tag(VERSION_KEY, span.resource.attributes[SERVICE_VERSION])
# add origin to root span
origin = _get_origin(span)
if origin and parent_id == 0:
datadog_span.set_tag(DD_ORIGIN, origin)
sampling_rate = _get_sampling_rate(span)
if sampling_rate is not None:
datadog_span.set_metric(SAMPLE_RATE_METRIC_KEY, sampling_rate)
# span events and span links are not supported except for extracting exception event context
return datadog_span
def _get_trace_ids(span):
"""Extract tracer ids from span"""
ctx = span.get_span_context()
trace_id = ctx.trace_id
span_id = ctx.span_id
if isinstance(span.parent, trace_api.Span):
parent_id = span.parent.get_span_context().span_id
elif isinstance(span.parent, trace_api.SpanContext):
parent_id = span.parent.span_id
else:
parent_id = 0
trace_id = _convert_trace_id_uint64(trace_id)
return trace_id, parent_id, span_id
def _convert_trace_id_uint64(otel_id):
"""Convert 128-bit int used for trace_id to 64-bit unsigned int"""
return otel_id & 0xFFFFFFFFFFFFFFFF
def _get_span_name(span):
"""Get span name by using instrumentation and kind while backing off to
span.name
"""
instrumentation_name = span.instrumentation_info.name if span.instrumentation_info else None
span_kind_name = span.kind.name if span.kind else None
name = (
f"{instrumentation_name}.{span_kind_name}"
if instrumentation_name and span_kind_name
else span.name
)
return name
def _get_resource(span):
"""Get resource name for span"""
if SpanAttributes.HTTP_METHOD in span.attributes:
route = span.attributes.get(SpanAttributes.HTTP_ROUTE)
return (
span.attributes[SpanAttributes.HTTP_METHOD] + " " + route
if route
else span.attributes[SpanAttributes.HTTP_METHOD]
)
return span.name
def _get_span_type(span):
"""Get Datadog span type"""
instrumentation_name = span.instrumentation_info.name if span.instrumentation_info else None
span_type = _INSTRUMENTATION_SPAN_TYPES.get(instrumentation_name)
return span_type
def _get_exc_info(span):
"""Parse span status description for exception type and value"""
exc_type, exc_val = span.status.description.split(":", 1)
return exc_type, exc_val.strip()
def _get_origin(span):
ctx = span.get_span_context()
origin = ctx.trace_state.get(DD_ORIGIN)
return origin
def _get_sampling_rate(span):
ctx = span.get_span_context()
tracer_provider = trace_api.get_tracer_provider()
if not hasattr(tracer_provider, "sampler"):
return None
sampler = tracer_provider.sampler
return (
sampler.rate
if ctx.trace_flags.sampled and isinstance(sampler, sampling.TraceIdRatioBased)
else None
)
def _extract_tags_from_resource(resource: Resource):
"""Parse tags from resource.attributes, except service.name which
has special significance within datadog"""
tags = {}
service_name = None
for attribute_key, attribute_value in resource.attributes.items():
if attribute_key == SERVICE_NAME_TAG:
service_name = attribute_value
else:
tags[attribute_key] = attribute_value
return [tags, service_name]
def _extract_tags_from_exception_events(events, datadog_span):
"""Parse error tags from exception events, error.msg error.type
and error.stack have special significance within datadog"""
for event in events:
if event.name is not None and event.name == EVENT_NAME_EXCEPTION:
for key, value in event.attributes.items():
if key == EXCEPTION_TYPE_ATTR_KEY:
datadog_span.set_tag(DD_ERROR_TYPE_TAG_KEY, value)
elif key == EXCEPTION_MSG_ATTR_KEY:
datadog_span.set_tag(DD_ERROR_MSG_TAG_KEY, value)
elif key == EXCEPTION_STACK_ATTR_KEY:
datadog_span.set_tag(DD_ERROR_STACK_TAG_KEY, value)
##
## OTHER DJANGO SETTINGS HERE
##
import opentelemetry.context as otel_context_api
from opentelemetry.trace.propagation import get_current_span as otel_get_current_span
from opentelemetry.context.context import Context
import ddtrace
import opentelemetry
import opentelemetry.trace as trace
from ddtrace.context import Context as DatadogContext
from ddtrace.profiling import Profiler
from ddtrace.profiling.event import StackBasedEvent
from ddtrace.profiling.recorder import Recorder
from ddtrace.provider import DefaultContextProvider
from ddtrace.span import Span as DatadogSpan
from opentelemetry.sdk.trace import ReadableSpan, Span
from common.datadog.otel_bridge import translate_otel_to_datadog
class OtelBridgeContextProvider(DefaultContextProvider):
def activate_otel(self, span: Span) -> None:
return super().activate(to_datadog_span(span))
def to_datadog_span(span: Span) -> DatadogSpan | None:
span_context = span.get_span_context()
if span_context == trace.INVALID_SPAN_CONTEXT:
return None
assert isinstance(span, ReadableSpan)
return translate_otel_to_datadog(span)
otel_bridge_context_provider = OtelBridgeContextProvider()
orig_otel_attach = otel_context_api.attach
orig_otel_detach = otel_context_api.detach
def dd_context_attach(context: Context) -> object:
# TODO: attaching works, need a way to correctly interact with the DD context
# and provide updates to the DD span. Otherwise it never gets released
token = orig_otel_attach(context)
otel_bridge_context_provider.activate_otel(otel_get_current_span(context))
return token
otel_context_api.attach = dd_context_attach
class OtelToDdTracer:
# Simulating the ddtrace trace from ddtrace.tracer.Tracer
# Current complication - this context provider is very, very important for the StackCollector
# to figure out what the active span is for a particular thread. There is a registration process
# that happens but will only be retrieved if this context is set. This context is only set
# when we are using the Datadog tracer to start a trace.
#
# There are additional complexities because the profiles we _really_ care about drop down into
# CPython implementations so there isn't anything we can truly monkeypatch
# The thread-to-span mapping is maintained in the DD _ThreadSpanLinks at https://github.com/DataDog/dd-trace-py/blob/b55b165c465b370d15f1046ff96e46c7d5800247/ddtrace/profiling/collector/stack.pyx#L388-L417
# and has a very deep tie with this context provider by registering an 'on_activate' hook
# https://github.com/DataDog/dd-trace-py/blob/b55b165c465b370d15f1046ff96e46c7d5800247/ddtrace/profiling/collector/stack.pyx#L456
# the deep tie is that the 'on_activate' hook is called whenever a span is activated by the tracer and
# becomes the current trace
#
# The challenge will be to figure out how to register a similar sort of hook within open telemetry.
# So when a span is activated on the otel side we need to have this active context provider
# that can call the hooks that were registered by the StackCollector (something like that)
context_provider = otel_bridge_context_provider
def current_span(self) -> DatadogSpan | None:
return to_datadog_span(trace.get_current_span())
from ddtrace.profiling import Profiler
class OtelRecorder(Recorder):
"""
Validates whether or not the correct trace/span id is set on the profiler events
"""
def push_events(self, events: list[StackBasedEvent]):
if events:
for event in events:
if isinstance(event, StackBasedEvent) and event.span_id:
print(
f"Recording event {event.name} with span {event.span_id} and local root {event.local_root_span_id}"
)
return super().push_events(events)
ddtrace.profiling.recorder.Recorder = OtelRecorder
prof = Profiler(
env=ENVIRONMENT,
service=SERVICE,
version=VERSION,
url=f"http://{TRACE_AGENT_HOST}:8126",
tracer=OtelToDdTracer()
)
prof.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment