Skip to content

Instantly share code, notes, and snippets.

@dasiths
Created May 3, 2024 14:46
Show Gist options
  • Save dasiths/fdd04ab9f55f6b95806d2a878a9264cb to your computer and use it in GitHub Desktop.
Save dasiths/fdd04ab9f55f6b95806d2a878a9264cb to your computer and use it in GitHub Desktop.
Promptflow OpenTelemetry Custom Tracing Example
import logging
import threading
from typing import Dict, List
from azure.monitor.opentelemetry.exporter import (
AzureMonitorLogExporter,
AzureMonitorTraceExporter,
)
from flow_nodes.core.ConfigProvider import (
IsDevelopmentEnvironment,
get_appinsights_connection_string,
get_environment,
get_service_instance,
get_service_name,
get_unfiltered_appinsights_connection_string,
should_filter_pii_in_development_environment,
should_log_to_console_in_development_environment,
)
from flow_nodes.core.span_processors.HttpRequestFilteringSpanProcessor import (
HttpRequestFilteringSpanProcessor,
)
from flow_nodes.core.span_processors.LlmFilteringSpanProcessor import (
LlmFilteringSpanProcessor,
)
from flow_nodes.core.span_processors.NodeFilteringSpanProcessor import (
NodeFilteringSpanProcessor,
)
from flow_nodes.core.span_processors.SensitiveDataFilteringSpanProcessor import (
SensitiveDataFilteringSpanProcessor,
)
from opentelemetry import trace
from opentelemetry._logs import get_logger_provider, set_logger_provider
from opentelemetry._logs._internal import _PROXY_LOGGER_PROVIDER
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter
from opentelemetry.sdk.trace import (
SynchronousMultiSpanProcessor,
Tracer,
TracerProvider,
)
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
def get_resource():
from opentelemetry.sdk.resources import Resource
# Resource semantic conventions https://opentelemetry.io/docs/specs/semconv/resource/
return Resource.create(
{
"service.name": get_service_name(),
"service.instance.id": get_service_instance(),
}
)
def setup_telemetry():
resource = get_resource()
provider: TracerProvider = trace.get_tracer_provider()
if not provider or provider == trace._PROXY_TRACER_PROVIDER:
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
else:
try:
provider._active_span_processor.shutdown()
except Exception as e:
print(f"Error clearing up existing span processors {str(e)}")
provider._active_span_processor = SynchronousMultiSpanProcessor()
if IsDevelopmentEnvironment():
if should_log_to_console_in_development_environment():
console_processor = BatchSpanProcessor(ConsoleSpanExporter())
if should_filter_pii_in_development_environment():
provider.add_span_processor(
NodeFilteringSpanProcessor(
SensitiveDataFilteringSpanProcessor(
HttpRequestFilteringSpanProcessor(
LlmFilteringSpanProcessor(console_processor)
)
)
)
)
else:
provider.add_span_processor(console_processor)
else:
# setup filtered span processor
filtered_processor = BatchSpanProcessor(
AzureMonitorTraceExporter(
connection_string=get_appinsights_connection_string()
)
)
provider.add_span_processor(
NodeFilteringSpanProcessor(
SensitiveDataFilteringSpanProcessor(
HttpRequestFilteringSpanProcessor(
LlmFilteringSpanProcessor(filtered_processor)
)
)
)
)
# setup unfiltered span processor
unfiltered_processor = BatchSpanProcessor(
AzureMonitorTraceExporter(
connection_string=get_unfiltered_appinsights_connection_string()
)
)
provider.add_span_processor(unfiltered_processor)
tracer = trace.get_tracer(__name__)
RequestsInstrumentor().instrument()
setup_logging()
logger = get_logger()
logger.info("OTEL setup complete")
logger.info(f"ENVIRONMENT={get_environment()}")
return tracer
def setup_logging():
resource = get_resource()
logging_provider = get_logger_provider()
if not logging_provider or logging_provider == _PROXY_LOGGER_PROVIDER:
logging_provider = LoggerProvider(resource=resource)
set_logger_provider(logging_provider)
if IsDevelopmentEnvironment():
if should_log_to_console_in_development_environment():
logging_provider.add_log_record_processor(
BatchLogRecordProcessor(ConsoleLogExporter())
)
else:
logging_provider.add_log_record_processor(
BatchLogRecordProcessor(
AzureMonitorLogExporter.from_connection_string(
get_appinsights_connection_string()
)
)
)
def get_logger():
# Attach LoggingHandler to namespaced logger
handler = LoggingHandler()
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def add_reasoning_event(
event: str,
attribute: str | Dict | int | List = None,
print_to_console: bool = False,
) -> None:
"""
Use this method to record sensitive reasoning events with customer data
"""
with get_tracer().start_as_current_span("reasoning_event") as span:
span.set_attribute("event.message", event)
span.set_attribute("contains_sensitive_data", True)
attribute = {event: str(attribute)}
span.add_event(name=event, attributes=attribute)
if print_to_console:
print(event)
def add_logging_event(
event: str,
has_pii: bool = False,
attribute: str | Dict | int | List = None,
print_to_console: bool = False,
) -> None:
"""
Use this method to record generic logging events.
"""
span = trace.get_current_span()
attribute = {event: str(attribute)}
if span:
if has_pii:
span.set_attribute("contains_sensitive_data", True)
span.add_event(name=event, attributes=attribute)
else:
with get_tracer().start_as_current_span("logging_event") as span:
if has_pii:
span.set_attribute("contains_sensitive_data", True)
span.add_event(name=event, attributes=attribute)
if print_to_console:
print(event)
# global variables
_tracer: Tracer = None
_lock = threading.Lock()
def get_tracer() -> Tracer:
global _tracer
# double checked lock pattern https://en.wikipedia.org/wiki/Double-checked_locking
if _tracer is None:
with _lock:
if _tracer is None:
_tracer = setup_telemetry()
return _tracer
@dasiths
Copy link
Author

dasiths commented May 3, 2024

span processor to remove the input/output messages from the LLM node

import json
import typing

from opentelemetry import context as context_api
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor

_openai_sdk_instrumentation_span_name = (
    "openai.resources.chat.completions.Completions.create"
)

# This span processor is used to look filter GPT specific span input/output.
# More about span processors here https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#span-processor
# The span is created by PF and the logic behind the span attributes are similar to this https://github.com/cartermp/opentelemetry-instrument-openai-py/blob/main/src/opentelemetry/instrumentation/openai/__init__.py


class LlmFilteringSpanProcessor(SpanProcessor):
    def __init__(self, inner_processor: SpanProcessor) -> None:
        self.inner_processor = inner_processor

    def on_start(
        self, span: Span, parent_context: typing.Optional[context_api.Context] = None
    ) -> None:
        self.inner_processor.on_start(span, parent_context)

    def on_end(self, span: ReadableSpan) -> None:
        if (
            span.name == _openai_sdk_instrumentation_span_name
        ):
            modified = False
            original_inputs = span.attributes.get("inputs")
            original_output = span.attributes.get("output")
            modified_inputs = original_inputs
            modified_output = original_output

            if original_inputs:
                modified = True
                input_data = json.loads(original_inputs)
                input_data["messages"] = []
                modified_inputs = json.dumps(input_data, indent=2)

            if original_output:
                modified = True
                output_data = json.loads(original_output)
                for choice in output_data.get("choices", []):
                    output_message = choice.get("message", {})
                    output_message["content"] = ""
                modified_output = json.dumps(output_data, indent=2)

            if modified is True:
                span = self.create_new_readable_input_span(
                    span, modified_inputs, modified_output
                )

        self.inner_processor.on_end(span)

    def create_new_readable_input_span(
        self, span: ReadableSpan, modified_inputs: str, modified_output: str
    ) -> ReadableSpan:
        new_span = ReadableSpan(
            name=span.name,
            context=span.context,
            parent=span.parent,
            resource=span.resource,
            kind=span.kind,
            start_time=span.start_time,
            end_time=span.end_time,
            attributes={
                **span.attributes,
                "inputs": modified_inputs,
                "output": modified_output,
            },
            events=span.events,
            links=span.links,
            status=span.status,
            instrumentation_info=span.instrumentation_info,
            instrumentation_scope=span.instrumentation_scope,
        )

        return new_span

    def shutdown(self) -> None:
        self.inner_processor.shutdown()

    def force_flush(self, timeout_millis: int = 30000) -> bool:
        return self.inner_processor.force_flush(timeout_millis)

@dasiths
Copy link
Author

dasiths commented May 3, 2024

span processor to filter the node input/output auto instrumented by promptflow

import typing

from opentelemetry import context as context_api
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor

_node_name = "node_name"
_span_type_name = "span_type"
_span_type_value = "Tool"

# This span processor is used to filter the inputs and outputs of nodes.
# More about span processors here https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/sdk.md#span-processor


class NodeFilteringSpanProcessor(SpanProcessor):
    def __init__(self, inner_processor: SpanProcessor) -> None:
        self.inner_processor = inner_processor

    def on_start(
        self, span: Span, parent_context: typing.Optional[context_api.Context] = None
    ) -> None:
        self.inner_processor.on_start(span, parent_context)

    def on_end(self, span: ReadableSpan) -> None:
        if (
            span.attributes.get(_node_name) is not None
            and span.attributes.get(_span_type_name) == _span_type_value
        ):
            span = self.create_new_readable_input_span(span, "{redacted}", "{redacted}")

        self.inner_processor.on_end(span)

    def create_new_readable_input_span(
        self, span: ReadableSpan, modified_inputs: str, modified_output: str
    ) -> ReadableSpan:
        new_span = ReadableSpan(
            name=span.name,
            context=span.context,
            parent=span.parent,
            resource=span.resource,
            kind=span.kind,
            start_time=span.start_time,
            end_time=span.end_time,
            attributes={
                **span.attributes,
                "inputs": modified_inputs,
                "output": modified_output,
            },
            events=span.events,
            links=span.links,
            status=span.status,
            instrumentation_info=span.instrumentation_info,
            instrumentation_scope=span.instrumentation_scope,
        )

        return new_span

    def shutdown(self) -> None:
        self.inner_processor.shutdown()

    def force_flush(self, timeout_millis: int = 30000) -> bool:
        return self.inner_processor.force_flush(timeout_millis)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment