Skip to content

Instantly share code, notes, and snippets.

@Oberon00
Created January 10, 2020 16:41
Show Gist options
  • Save Oberon00/5c30bddfbb438129bcebee9190500bf1 to your computer and use it in GitHub Desktop.
Save Oberon00/5c30bddfbb438129bcebee9190500bf1 to your computer and use it in GitHub Desktop.
OpenTelemetry-Python protobuf exporter
# https://github.com/open-telemetry/opentelemetry-java/blob/91b6c1fe41d0df52b14541a6ad0866195f1a90c9/api/src/main/java/io/opentelemetry/trace/SpanId.java#L121-L123
_IDENTIFIER_BYTE_ORDER = "big"
def _set_pb_attrval(attrpb, attrval):
"""Convert the attributes to otel tags (based on the jaeger exporter)."""
if isinstance(attrval, bool):
attrpb.type = common_pb2.AttributeKeyValue.ValueType.BOOL
attrpb.bool_value = attrval
elif isinstance(attrval, str):
attrpb.type = common_pb2.AttributeKeyValue.ValueType.STRING
attrpb.string_value = attrval
if isinstance(attrval, int):
attrpb.type = common_pb2.AttributeKeyValue.ValueType.INT
attrpb.int_value = attrval
if isinstance(attrval, float):
attrpb.type = common_pb2.AttributeKeyValue.ValueType.DOUBLE
attrpb.double_value = attrval
return None
def _set_pb_attributes(attrpb, attrdict):
for key, value in attrdict.items():
attrentrypb = attrpb.add()
_set_pb_attrval(attrentrypb, value)
def _set_pb_ctxlike(ctxpb, ctx):
ctxpb.trace_id = ctx.trace_id.to_bytes(128 // 8, _IDENTIFIER_BYTE_ORDER)
ctxpb.span_id = ctx.span_id.to_bytes(64 // 8, _IDENTIFIER_BYTE_ORDER)
for key, value in ctx.trace_state.items():
entrypb = ctxpb.tracestate.entries.add()
entrypb.key = key
entrypb.value = value
class PbSpanExporter(SpanExporter):
"""Implementation of :class:`SpanExporter` that exports spans using otel protobuf.
"""
def __init__(
self, endpoint_url: str, insecure_disable_tls_verification: bool = False
):
self.endpoint_url = endpoint_url
self.session = requests.Session()
self.verify_tls = not insecure_disable_tls_verification
def do_export(
self, spans: typing.Sequence[trace.Span]
) -> SpanExportResult:
if not spans:
return SpanExportResult.SUCCESS
export_request = trace_service_pb2.ExportTraceServiceRequest()
resource_span = export_request.resource_spans.add()
for span in spans:
spanpb = resource_span.spans.add()
if span.parent is not None:
pctx = (
span.parent
if isinstance(span.parent, trace.SpanContext)
else span.parent.get_context()
)
spanpb.parent_span_id = pctx.span_id.to_bytes(
64 // 8, _IDENTIFIER_BYTE_ORDER
)
_set_pb_ctxlike(spanpb, span.get_context())
spanpb.name = span.name
spanpb.kind = getattr(trace_pb2.Span.SpanKind, span.kind.name)
spanpb.start_time_unixnano = span.start_time
if span.end_time is not None:
spanpb.end_time_unixnano = span.end_time
_set_pb_attributes(spanpb.attributes, span.attributes)
for event in span.events:
eventpb = spanpb.time_events.timed_event.add()
eventpb.time.FromNanoseconds(event.timestamp)
eventpb.event.name = event.name
_set_pb_attributes(eventpb.attributes, event.attributes)
for link in span.links:
linkpb = spanpb.links.link.add()
_set_pb_ctxlike(spanpb, span.get_context())
_set_pb_attributes(linkpb.attributes, link.attributes)
spanpb.status.code = span.status.canonical_code.value
if span.status.description is not None:
spanpb.status.message = span.status.description
print(span) # TODO
with self.session.post(
self.endpoint_url,
headers={"Content-Type": "application/x-otel-binary"}, # TODO
verify=self.verify_tls,
data=export_request.SerializeToString(),
) as resp:
resp.raise_for_status()
return SpanExportResult.SUCCESS
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment