Created
January 10, 2020 16:41
-
-
Save Oberon00/5c30bddfbb438129bcebee9190500bf1 to your computer and use it in GitHub Desktop.
OpenTelemetry-Python protobuf exporter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://github.com/open-telemetry/opentelemetry-java/blob/91b6c1fe41d0df52b14541a6ad0866195f1a90c9/api/src/main/java/io/opentelemetry/trace/SpanId.java#L121-L123 | |
_IDENTIFIER_BYTE_ORDER = "big" | |
def _set_pb_attrval(attrpb, attrval): | |
"""Convert the attributes to otel tags (based on the jaeger exporter).""" | |
if isinstance(attrval, bool): | |
attrpb.type = common_pb2.AttributeKeyValue.ValueType.BOOL | |
attrpb.bool_value = attrval | |
elif isinstance(attrval, str): | |
attrpb.type = common_pb2.AttributeKeyValue.ValueType.STRING | |
attrpb.string_value = attrval | |
if isinstance(attrval, int): | |
attrpb.type = common_pb2.AttributeKeyValue.ValueType.INT | |
attrpb.int_value = attrval | |
if isinstance(attrval, float): | |
attrpb.type = common_pb2.AttributeKeyValue.ValueType.DOUBLE | |
attrpb.double_value = attrval | |
return None | |
def _set_pb_attributes(attrpb, attrdict): | |
for key, value in attrdict.items(): | |
attrentrypb = attrpb.add() | |
_set_pb_attrval(attrentrypb, value) | |
def _set_pb_ctxlike(ctxpb, ctx): | |
ctxpb.trace_id = ctx.trace_id.to_bytes(128 // 8, _IDENTIFIER_BYTE_ORDER) | |
ctxpb.span_id = ctx.span_id.to_bytes(64 // 8, _IDENTIFIER_BYTE_ORDER) | |
for key, value in ctx.trace_state.items(): | |
entrypb = ctxpb.tracestate.entries.add() | |
entrypb.key = key | |
entrypb.value = value | |
class PbSpanExporter(SpanExporter): | |
"""Implementation of :class:`SpanExporter` that exports spans using otel protobuf. | |
""" | |
def __init__( | |
self, endpoint_url: str, insecure_disable_tls_verification: bool = False | |
): | |
self.endpoint_url = endpoint_url | |
self.session = requests.Session() | |
self.verify_tls = not insecure_disable_tls_verification | |
def do_export( | |
self, spans: typing.Sequence[trace.Span] | |
) -> SpanExportResult: | |
if not spans: | |
return SpanExportResult.SUCCESS | |
export_request = trace_service_pb2.ExportTraceServiceRequest() | |
resource_span = export_request.resource_spans.add() | |
for span in spans: | |
spanpb = resource_span.spans.add() | |
if span.parent is not None: | |
pctx = ( | |
span.parent | |
if isinstance(span.parent, trace.SpanContext) | |
else span.parent.get_context() | |
) | |
spanpb.parent_span_id = pctx.span_id.to_bytes( | |
64 // 8, _IDENTIFIER_BYTE_ORDER | |
) | |
_set_pb_ctxlike(spanpb, span.get_context()) | |
spanpb.name = span.name | |
spanpb.kind = getattr(trace_pb2.Span.SpanKind, span.kind.name) | |
spanpb.start_time_unixnano = span.start_time | |
if span.end_time is not None: | |
spanpb.end_time_unixnano = span.end_time | |
_set_pb_attributes(spanpb.attributes, span.attributes) | |
for event in span.events: | |
eventpb = spanpb.time_events.timed_event.add() | |
eventpb.time.FromNanoseconds(event.timestamp) | |
eventpb.event.name = event.name | |
_set_pb_attributes(eventpb.attributes, event.attributes) | |
for link in span.links: | |
linkpb = spanpb.links.link.add() | |
_set_pb_ctxlike(spanpb, span.get_context()) | |
_set_pb_attributes(linkpb.attributes, link.attributes) | |
spanpb.status.code = span.status.canonical_code.value | |
if span.status.description is not None: | |
spanpb.status.message = span.status.description | |
print(span) # TODO | |
with self.session.post( | |
self.endpoint_url, | |
headers={"Content-Type": "application/x-otel-binary"}, # TODO | |
verify=self.verify_tls, | |
data=export_request.SerializeToString(), | |
) as resp: | |
resp.raise_for_status() | |
return SpanExportResult.SUCCESS |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment