Created
May 3, 2024 14:46
-
-
Save dasiths/fdd04ab9f55f6b95806d2a878a9264cb to your computer and use it in GitHub Desktop.
Promptflow OpenTelemetry Custom Tracing Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import threading | |
from typing import Dict, List | |
from azure.monitor.opentelemetry.exporter import ( | |
AzureMonitorLogExporter, | |
AzureMonitorTraceExporter, | |
) | |
from flow_nodes.core.ConfigProvider import ( | |
IsDevelopmentEnvironment, | |
get_appinsights_connection_string, | |
get_environment, | |
get_service_instance, | |
get_service_name, | |
get_unfiltered_appinsights_connection_string, | |
should_filter_pii_in_development_environment, | |
should_log_to_console_in_development_environment, | |
) | |
from flow_nodes.core.span_processors.HttpRequestFilteringSpanProcessor import ( | |
HttpRequestFilteringSpanProcessor, | |
) | |
from flow_nodes.core.span_processors.LlmFilteringSpanProcessor import ( | |
LlmFilteringSpanProcessor, | |
) | |
from flow_nodes.core.span_processors.NodeFilteringSpanProcessor import ( | |
NodeFilteringSpanProcessor, | |
) | |
from flow_nodes.core.span_processors.SensitiveDataFilteringSpanProcessor import ( | |
SensitiveDataFilteringSpanProcessor, | |
) | |
from opentelemetry import trace | |
from opentelemetry._logs import get_logger_provider, set_logger_provider | |
from opentelemetry._logs._internal import _PROXY_LOGGER_PROVIDER | |
from opentelemetry.instrumentation.requests import RequestsInstrumentor | |
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler | |
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor, ConsoleLogExporter | |
from opentelemetry.sdk.trace import ( | |
SynchronousMultiSpanProcessor, | |
Tracer, | |
TracerProvider, | |
) | |
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter | |
def get_resource(): | |
from opentelemetry.sdk.resources import Resource | |
# Resource semantic conventions https://opentelemetry.io/docs/specs/semconv/resource/ | |
return Resource.create( | |
{ | |
"service.name": get_service_name(), | |
"service.instance.id": get_service_instance(), | |
} | |
) | |
def setup_telemetry(): | |
resource = get_resource() | |
provider: TracerProvider = trace.get_tracer_provider() | |
if not provider or provider == trace._PROXY_TRACER_PROVIDER: | |
provider = TracerProvider(resource=resource) | |
trace.set_tracer_provider(provider) | |
else: | |
try: | |
provider._active_span_processor.shutdown() | |
except Exception as e: | |
print(f"Error clearing up existing span processors {str(e)}") | |
provider._active_span_processor = SynchronousMultiSpanProcessor() | |
if IsDevelopmentEnvironment(): | |
if should_log_to_console_in_development_environment(): | |
console_processor = BatchSpanProcessor(ConsoleSpanExporter()) | |
if should_filter_pii_in_development_environment(): | |
provider.add_span_processor( | |
NodeFilteringSpanProcessor( | |
SensitiveDataFilteringSpanProcessor( | |
HttpRequestFilteringSpanProcessor( | |
LlmFilteringSpanProcessor(console_processor) | |
) | |
) | |
) | |
) | |
else: | |
provider.add_span_processor(console_processor) | |
else: | |
# setup filtered span processor | |
filtered_processor = BatchSpanProcessor( | |
AzureMonitorTraceExporter( | |
connection_string=get_appinsights_connection_string() | |
) | |
) | |
provider.add_span_processor( | |
NodeFilteringSpanProcessor( | |
SensitiveDataFilteringSpanProcessor( | |
HttpRequestFilteringSpanProcessor( | |
LlmFilteringSpanProcessor(filtered_processor) | |
) | |
) | |
) | |
) | |
# setup unfiltered span processor | |
unfiltered_processor = BatchSpanProcessor( | |
AzureMonitorTraceExporter( | |
connection_string=get_unfiltered_appinsights_connection_string() | |
) | |
) | |
provider.add_span_processor(unfiltered_processor) | |
tracer = trace.get_tracer(__name__) | |
RequestsInstrumentor().instrument() | |
setup_logging() | |
logger = get_logger() | |
logger.info("OTEL setup complete") | |
logger.info(f"ENVIRONMENT={get_environment()}") | |
return tracer | |
def setup_logging(): | |
resource = get_resource() | |
logging_provider = get_logger_provider() | |
if not logging_provider or logging_provider == _PROXY_LOGGER_PROVIDER: | |
logging_provider = LoggerProvider(resource=resource) | |
set_logger_provider(logging_provider) | |
if IsDevelopmentEnvironment(): | |
if should_log_to_console_in_development_environment(): | |
logging_provider.add_log_record_processor( | |
BatchLogRecordProcessor(ConsoleLogExporter()) | |
) | |
else: | |
logging_provider.add_log_record_processor( | |
BatchLogRecordProcessor( | |
AzureMonitorLogExporter.from_connection_string( | |
get_appinsights_connection_string() | |
) | |
) | |
) | |
def get_logger(): | |
# Attach LoggingHandler to namespaced logger | |
handler = LoggingHandler() | |
logger = logging.getLogger(__name__) | |
logger.addHandler(handler) | |
logger.setLevel(logging.INFO) | |
return logger | |
def add_reasoning_event( | |
event: str, | |
attribute: str | Dict | int | List = None, | |
print_to_console: bool = False, | |
) -> None: | |
""" | |
Use this method to record sensitive reasoning events with customer data | |
""" | |
with get_tracer().start_as_current_span("reasoning_event") as span: | |
span.set_attribute("event.message", event) | |
span.set_attribute("contains_sensitive_data", True) | |
attribute = {event: str(attribute)} | |
span.add_event(name=event, attributes=attribute) | |
if print_to_console: | |
print(event) | |
def add_logging_event( | |
event: str, | |
has_pii: bool = False, | |
attribute: str | Dict | int | List = None, | |
print_to_console: bool = False, | |
) -> None: | |
""" | |
Use this method to record generic logging events. | |
""" | |
span = trace.get_current_span() | |
attribute = {event: str(attribute)} | |
if span: | |
if has_pii: | |
span.set_attribute("contains_sensitive_data", True) | |
span.add_event(name=event, attributes=attribute) | |
else: | |
with get_tracer().start_as_current_span("logging_event") as span: | |
if has_pii: | |
span.set_attribute("contains_sensitive_data", True) | |
span.add_event(name=event, attributes=attribute) | |
if print_to_console: | |
print(event) | |
# global variables | |
_tracer: Tracer = None | |
_lock = threading.Lock() | |
def get_tracer() -> Tracer: | |
global _tracer | |
# double checked lock pattern https://en.wikipedia.org/wiki/Double-checked_locking | |
if _tracer is None: | |
with _lock: | |
if _tracer is None: | |
_tracer = setup_telemetry() | |
return _tracer |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
span processor to filter the node input/output auto instrumented by promptflow