Skip to content

Instantly share code, notes, and snippets.

@cmw2
Created November 1, 2023 16:10
Show Gist options
  • Save cmw2/0364d7b7eeea482f8b058ed001d1ec3a to your computer and use it in GitHub Desktop.
Save cmw2/0364d7b7eeea482f8b058ed001d1ec3a to your computer and use it in GitHub Desktop.
Add Azure Monitor OpenTelemetry to Sample AOAISearchDemo
# CMW: This is in app/backend
import datetime
import json
import mimetypes
import yaml
from azure.core.credentials import AzureKeyCredential
from azure.identity import DefaultAzureCredential
from azure.search.documents import SearchClient
from azure.storage.blob import BlobServiceClient
from backend.approaches.approach import Approach
from backend.approaches.approach_classifier import ApproachClassifier
from backend.approaches.chatstructured import ChatStructuredApproach
from backend.approaches.chatunstructured import ChatUnstructuredApproach
from backend.cognition.openai_client import OpenAIClient
from backend.config import DefaultConfig
from backend.contracts.chat_response import Answer, ApproachType, ChatResponse
from backend.contracts.error import (
ContentFilterException,
OutOfScopeException,
UnauthorizedDBAccessException,
)
from backend.contracts.search_settings import SearchSettings
from backend.data_client.data_client import DataClient
from backend.utilities.access_management import AccessManager
from common.contracts.chat_session import (
ChatSession,
DialogClassification,
ParticipantType,
)
from flask import Flask, jsonify, request
# CMW: Added this section for AZMon and OpenTelemetry
from opentelemetry import trace
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorTraceExporter
# Use the current user identity to authenticate with Azure OpenAI, Cognitive Search and Blob Storage (no secrets needed,
# just use 'az login' locally, and managed identity when deployed on Azure). If you need to use keys, use separate AzureKeyCredential instances with the
# keys for each service
# If you encounter a blocking error during a DefaultAzureCredntial resolution, you can exclude the problematic credential by using a parameter (ex. exclude_shared_token_cache_credential=True)
DefaultConfig.initialize()
azure_credential = DefaultAzureCredential()
search_credential = AzureKeyCredential(DefaultConfig.AZURE_SEARCH_KEY)
openai_client = OpenAIClient()
# Set up clients for Cognitive Search and Storage
search_client = SearchClient(
endpoint=f"https://{DefaultConfig.AZURE_SEARCH_SERVICE}.search.windows.net",
index_name=DefaultConfig.AZURE_SEARCH_INDEX,
credential=search_credential,
)
blob_client = BlobServiceClient.from_connection_string(
DefaultConfig.AZURE_BLOB_CONNECTION_STRING
)
blob_container = blob_client.get_container_client(DefaultConfig.AZURE_STORAGE_CONTAINER)
# get the logger that is already initialized
logger = DefaultConfig.logger
chat_approaches = {
ApproachType.unstructured.name: ChatUnstructuredApproach(
search_client,
DefaultConfig.KB_FIELDS_SOURCEPAGE,
DefaultConfig.KB_FIELDS_CONTENT,
logger,
search_threshold_percentage=DefaultConfig.SEARCH_THRESHOLD_PERCENTAGE,
),
ApproachType.structured.name: ChatStructuredApproach(
DefaultConfig.SQL_CONNECTION_STRING, logger
),
}
# initialize data client
base_uri = DefaultConfig.DATA_SERVICE_URI
data_client = DataClient(base_uri, logger)
approach_classifier = ApproachClassifier(logger)
access_manager = AccessManager()
app = Flask(__name__)
# CMW: Added this code to instrument the flask app
trace.set_tracer_provider(
TracerProvider(
resource=Resource.create({SERVICE_NAME: "backend-service"}),
)
)
FlaskInstrumentor().instrument_app(app)
RequestsInstrumentor().instrument()
trace_exporter = AzureMonitorTraceExporter(
connection_string=DefaultConfig.APPLICATION_INSIGHTS_CNX_STR
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(trace_exporter)
)
@app.route("/", defaults={"path": ""})
@app.route("/<path:path>")
def index(path):
return app.send_static_file("index.html")
@app.route("/assets/<path:rest_of_path>")
def assets(rest_of_path):
return app.send_static_file(f"assets/{rest_of_path}")
# Serve content files from blob storage from within the app to keep the example self-contained.
# *** NOTE *** this assumes that the content files are public, or at least that all users of the app
# can access all the files. This is also slow and memory hungry.
@app.route("/content/<path>")
def content_file(path):
blob = blob_container.get_blob_client(path).download_blob()
mime_type = blob.properties["content_settings"]["content_type"]
if mime_type == "application/octet-stream":
mime_type = mimetypes.guess_type(path)[0] or "application/octet-stream"
return (
blob.readall(),
200,
{"Content-Type": mime_type, "Content-Disposition": f"inline; filename={path}"},
)
@app.route("/chat", methods=["POST"])
def chat():
# try get conversation_id and dialog_id needed for logging
conversation_id = request.json.get(
"conversation_id", "no conversation_id found in request"
)
dialog_id = request.json.get("dialog_id", "no dialog_id found in request")
user_id = request.json.get("user_id", "no user_id found in request")
classification_override = None
overrides = request.json.get("overrides", None)
if overrides:
classification_override = overrides.get("classification_override", None)
# fetch user profile
user_profile = data_client.get_user_profile(user_id)
# check user access rules
allowed_resources = data_client.get_user_resources(user_id)
allowed_approaches = access_manager.get_allowed_approaches(allowed_resources)
logger.set_conversation_and_dialog_ids(conversation_id, dialog_id)
properties = logger.get_updated_properties(
{"conversation_id": conversation_id, "dialog_id": dialog_id, "user_id": user_id}
)
logger.info(f"request: {json.dumps(request.json)}", extra=properties)
user_message = request.json.get("dialog")
chat_session: ChatSession
chat_session_exists = data_client.check_chat_session(user_id, conversation_id)
if not chat_session_exists:
chat_session = data_client.create_chat_session(user_id, conversation_id)
logger.info(
f"created new chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
else:
chat_session = data_client.get_chat_session(user_id, conversation_id)
logger.info(
f"chat session for user {user_id} and session {conversation_id} already exists",
extra=properties,
)
history = [
{
"participant_type": dialog.participant_type.value,
"utterance": dialog.utterance,
"question_type": dialog.classification.value,
}
for dialog in chat_session.conversation
]
history.append(
{"participant_type": ParticipantType.user.value, "utterance": user_message}
)
bot_config = yaml.safe_load(open("backend/bot_config.yaml", "r"))
question_classification = None
try:
if classification_override:
approach_type = ApproachType(classification_override)
else:
approach_type = approach_classifier.run(history, bot_config, openai_client)
logger.info(f"question_type: {approach_type.name}", extra=properties)
if approach_type == ApproachType.chit_chat:
chit_chat_canned_response = "I'm sorry, but the question you've asked is outside my area of expertise. I'd be happy to help with any questions related to Microsoft Surface PCs and Laptops. Please feel free to ask about those, and I'll do my best to assist you!"
data_client.add_dialog_to_chat_session(
user_id,
conversation_id,
ParticipantType.user,
datetime.datetime.now(),
user_message,
DialogClassification.chit_chat,
)
logger.info(
f"added dialog to chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
answer = Answer(formatted_answer=chit_chat_canned_response)
response = ChatResponse(answer=answer, classification=approach_type)
data_client.add_dialog_to_chat_session(
user_id,
conversation_id,
ParticipantType.assistant,
datetime.datetime.now(),
json.dumps(response.answer.to_item()),
DialogClassification.chit_chat,
)
logger.info(
f"added response {chit_chat_canned_response} to chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
return jsonify(response.to_item())
elif approach_type == ApproachType.inappropriate:
inappropiate_canned_response = "I'm sorry, but the question you've asked goes against our content safety policy due to harmful, offensive, or illegal content. I'd be happy to help with any questions related to Microsoft Surface PCs and Laptops. Please feel free to ask about those, and I'll do my best to assist you!"
# TODO: Use DialogClassification.inappropiate once data service has been updated.
data_client.add_dialog_to_chat_session(
user_id,
conversation_id,
ParticipantType.user,
datetime.datetime.now(),
user_message,
DialogClassification.chit_chat,
)
logger.info(
f"added dialog to chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
answer = Answer(formatted_answer=inappropiate_canned_response)
response = ChatResponse(answer=answer, classification=approach_type)
# TODO: Use DialogClassification.inappropiate once data service has been updated.
data_client.add_dialog_to_chat_session(
user_id,
conversation_id,
ParticipantType.assistant,
datetime.datetime.now(),
json.dumps(response.answer.to_item()),
DialogClassification.chit_chat,
)
logger.info(
f"added response {inappropiate_canned_response} to chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
return jsonify(response.to_item())
# check if user is allowed to use the approach
user_allowed = access_manager.is_user_allowed(allowed_approaches, approach_type)
if not user_allowed:
prohibited_resource = access_manager.map_approach_to_resource(approach_type)
raise Exception(
f"This query requires access to {prohibited_resource}\nUser: {user_profile.user_name} is not allowed to use this resource, please try another query or contact your administrator."
)
question_classification = (
DialogClassification.unstructured_query
if approach_type == ApproachType.unstructured
else DialogClassification.structured_query
)
# filtered_chat_session = data_client.filter_chat_session(chat_session, filter=question_classification)
filtered_chat_session = chat_session
simplified_history = [
{
"participant_type": dialog.participant_type.value,
"utterance": dialog.utterance,
}
for dialog in filtered_chat_session.conversation
]
simplified_history.append(
{"participant_type": ParticipantType.user.value, "utterance": user_message}
)
impl = chat_approaches.get(approach_type.name)
if not impl:
return jsonify({"error": "unknown approach"}), 400
response = impl.run(
simplified_history,
bot_config,
openai_client,
request.json.get("overrides") or None,
)
# state store update
if not response.error:
data_client.add_dialog_to_chat_session(
user_id,
conversation_id,
ParticipantType.user,
datetime.datetime.now(),
user_message,
question_classification,
)
logger.info(
f"added dialog to chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
data_client.add_dialog_to_chat_session(
user_id,
conversation_id,
ParticipantType.assistant,
datetime.datetime.now(),
json.dumps(response.answer.to_item()),
question_classification,
)
logger.info(
f"added response {response.answer.formatted_answer} to chat session for user {user_id} and session {conversation_id}",
extra=properties,
)
return jsonify(response.to_item())
except OutOfScopeException as e:
logger.exception(f"Exception in /chat: {str(e)}", extra=properties)
if access_manager.is_user_allowed(
allowed_approaches, e.suggested_classification
):
response = ChatResponse(
answer=Answer(
f"Error when querying knowledge-base: '{str(e.message)}'."
),
show_retry=True,
suggested_classification=e.suggested_classification,
classification=question_classification,
)
return jsonify(response.to_item())
else:
response = ChatResponse(
answer=Answer(str(e.message)), classification=question_classification
)
return jsonify(response.to_item())
except UnauthorizedDBAccessException as e:
logger.exception(
f"UnauthorizedDBAccessExceptionException in /chat: {str(e)}",
extra=properties,
)
response = ChatResponse(answer=Answer(), error=str(e.message))
return jsonify(response.to_item()), 403
except ContentFilterException as e:
logger.exception(f"ContentFilterException in /chat: {str(e)}", extra=properties)
response = ChatResponse(answer=Answer(), error=str(e.message))
return jsonify(response.to_item()), 400
except Exception as e:
logger.exception(f"Exception in /chat: {e}", extra=properties)
response = ChatResponse(answer=Answer(), error=str(e), show_retry=True)
return jsonify(response.to_item()), 500
@app.route("/user-profiles", methods=["GET"])
def get_all_user_profiles():
try:
user_profiles = data_client.get_all_user_profiles()
user_profiles_dict = [user_profile.to_item() for user_profile in user_profiles]
return jsonify(user_profiles_dict)
except Exception as e:
logger.exception(f"Exception in /user-profiles: {e}")
return jsonify({"error": str(e)}), 500
@app.route("/chat-sessions/<user_id>/<conversation_id>", methods=["DELETE"])
def clear_chat_session(user_id: str, conversation_id: str):
properties = logger.get_updated_properties(
{"user_id": user_id, "conversation_id": conversation_id}
)
try:
data_client.clear_chat_session(user_id, conversation_id)
logger.info(f"cleared chat session.", extra=properties)
return jsonify({"message": "cleared chat session"})
except Exception as e:
logger.exception(
f"Exception in /chat-sessions/<user_id>/<conversation_id>: {e}"
)
return jsonify({"error": str(e)}), 500
@app.route("/search-settings", methods=["GET"])
def get_search_settings():
try:
skip_vectorization_str = DefaultConfig.SEARCH_SKIP_VECTORIZATION
vectorization_enabled = (
True
if skip_vectorization_str.lower() == "false"
else False
if skip_vectorization_str.lower() == "true"
else None
)
if vectorization_enabled is None:
raise Exception(
f"Invalid value for SEARCH_SKIP_VECTORIZATION: {skip_vectorization_str}. Must be either 'true' or 'false'"
)
search_settings = SearchSettings(vectorization_enabled)
return jsonify(search_settings.to_item())
except Exception as e:
logger.exception(f"Exception in /search-settings: {e}")
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run()
# CMW: This is in app/backend
#!/usr/bin/env python3
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import os
from datetime import datetime
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
from common.logging.log_helper import CustomLogger
from dotenv import load_dotenv
# load value from .env file if it exists, unless deploying in a production environment
if os.getenv("ENVIRONMENT") != "PROD":
load_dotenv(override=True, dotenv_path=f"{os.getcwd()}/backend/.env")
class Config_Reader():
def __init__(self, logger: CustomLogger) -> None:
self.logger = logger
def set_logger(self, logger: CustomLogger):
self.logger = logger
def read_config_value(self, key_name:str)-> str:
return self._get_config_value(key_name)
def _get_secret_from_keyvault(self, key_name:str):
KEYVAULT_URI = os.getenv("KEYVAULT_URI", "")
credential = DefaultAzureCredential()
key_name = key_name.replace("_", "-")
client = SecretClient(vault_url=KEYVAULT_URI, credential=credential)
return client.get_secret(key_name).value
def _get_config_value(self, key_name:str)-> str:
value = os.getenv(key_name, None)
if value is None or value == "":
start = datetime.now()
value = self._get_secret_from_keyvault(key_name)
end = datetime.now()
duration = (end - start).microseconds/1000
addl_dimension = {"keyvault_duration": duration}
if self.logger:
add_props = self.logger.get_updated_properties(addl_dimension)
self.logger.info(f"key name: {key_name}, keyvault_duration: {duration}", extra=add_props)
if value is None:
if self.logger:
self.logger.error(f"Necessary value {value} couldn't be found in environment or Key Vault")
raise Exception(f"Necessary value {value} couldn't be found in environment or Key Vault")
return value
class DefaultConfig:
_initialized = False
@classmethod
def initialize(cls):
if not cls._initialized:
config_reader = Config_Reader(None)
# CMW: Put this on the class so can use it from app.py
cls.APPLICATION_INSIGHTS_CNX_STR = config_reader.read_config_value("APPLICATION-INSIGHTS-CNX-STR")
cls.logger = CustomLogger(cls.APPLICATION_INSIGHTS_CNX_STR)
cls.logger.set_conversation_and_dialog_ids("BACKEND_APP", "NONE")
config_reader.set_logger(cls.logger)
try:
# cls.logger.info(f"APPINSIGHTSINFO: {cls.APPLICATION_INSIGHTS_CNX_STR}")
cls.AZURE_OPENAI_GPT4_SERVICE = config_reader.read_config_value("AZURE-OPENAI-GPT4-SERVICE")
cls.AZURE_OPENAI_GPT4_API_KEY = config_reader.read_config_value("AZURE-OPENAI-GPT4-API-KEY")
cls.AZURE_OPENAI_CLASSIFIER_SERVICE = config_reader.read_config_value("AZURE-OPENAI-CLASSIFIER-SERVICE")
cls.AZURE_OPENAI_CLASSIFIER_API_KEY = config_reader.read_config_value("AZURE-OPENAI-CLASSIFIER-API-KEY")
cls.AZURE_OPENAI_EMBEDDINGS_SERVICE = config_reader.read_config_value("AZURE-OPENAI-EMBEDDINGS-SERVICE")
cls.AZURE_OPENAI_EMBEDDINGS_API_KEY = config_reader.read_config_value("AZURE-OPENAI-EMBEDDINGS-API-KEY")
cls.AZURE_SEARCH_SERVICE = config_reader.read_config_value("AZURE-SEARCH-SERVICE")
cls.AZURE_SEARCH_INDEX = config_reader.read_config_value("AZURE-SEARCH-INDEX")
cls.AZURE_SEARCH_KEY = config_reader.read_config_value("AZURE-SEARCH-KEY")
cls.KB_FIELDS_CONTENT = config_reader.read_config_value("KB-FIELDS-CONTENT")
cls.KB_FIELDS_CATEGORY = config_reader.read_config_value("KB-FIELDS-CATEGORY")
cls.KB_FIELDS_SOURCEPAGE = config_reader.read_config_value("KB-FIELDS-SOURCEPAGE")
cls.SEARCH_SKIP_VECTORIZATION = config_reader.read_config_value("SEARCH-SKIP-VECTORIZATION")
cls.AZURE_STORAGE_ACCOUNT = config_reader.read_config_value("AZURE-STORAGE-ACCOUNT")
cls.AZURE_STORAGE_CONTAINER = config_reader.read_config_value("AZURE-STORAGE-CONTAINER")
cls.AZURE_BLOB_CONNECTION_STRING = config_reader.read_config_value("AZURE-BLOB-CONNECTION-STRING")
cls.DATA_SERVICE_URI = config_reader.read_config_value("DATA-SERVICE-URI")
cls.SQL_CONNECTION_STRING = config_reader.read_config_value("SQL-CONNECTION-STRING")
cls.RATIO_OF_INDEX_TO_HISTORY = int(os.getenv("RATIO_OF_INDEX_TO_HISTORY", 5)) if os.getenv("RATIO_OF_INDEX_TO_HISTORY") != "" else 5
cls.SEARCH_THRESHOLD_PERCENTAGE = int(os.getenv("SEARCH_THRESHOLD_PERCENTAGE", 50)) if os.getenv("SEARCH_THRESHOLD_PERCENTAGE") != "" else 50
cls.logger.info(f"SEARCH_THRESHOLD_PERCENTAGE: {cls.SEARCH_THRESHOLD_PERCENTAGE}")
cls._initialized = True
except Exception as e:
cls.logger.error(f"Error while loading config: {e}")
raise e
# CMW: This is in app/common/logging
import logging
from copy import deepcopy
# CMW: removed from opencensus.... and replaced with
from azure.monitor.opentelemetry import configure_azure_monitor
from opentelemetry._logs import set_logger_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter
class CustomLogger(logging.LoggerAdapter):
def __init__(self, app_insights_cnx_str):
self.app_insights_cnx_str = app_insights_cnx_str
self.extra = {}
custom_dimensions = {
"conversation_id": "please set conversation_id before logging",
"dialog_id": "please set dialog_id before logging"
}
log_properties = {
"custom_dimensions": custom_dimensions,
}
self.extra = log_properties
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
self.initialize_loggers()
def set_conversation_id(self, conversation_id: str):
self.extra["custom_dimensions"]["conversation_id"] = conversation_id
def set_dialog_id(self, dialog_id: str):
self.extra["custom_dimensions"]["dialog_id"] = dialog_id
def set_conversation_and_dialog_ids(self, conversation_id: str, dialog_id: str):
self.set_conversation_id(conversation_id)
self.set_dialog_id(dialog_id)
def get_converation_and_dialog_ids(self) -> dict:
return {
"conversation_id": self.extra["custom_dimensions"]["conversation_id"],
"dialog_id": self.extra["custom_dimensions"]["dialog_id"]
}
def initialize_loggers(self):
if self.app_insights_cnx_str:
# add appInsights logger if it is not already added by another instance of CustomLogger
# CMW: Is there something better to check for than just generic LoggingHandler?
if not any(isinstance(handler, LoggingHandler) for handler in self.logger.handlers):
# CMW: Added this new opentelemetry based code
logger_provider = LoggerProvider()
set_logger_provider(logger_provider)
exporter = AzureMonitorLogExporter(
connection_string=self.app_insights_cnx_str
)
logger_provider.add_log_record_processor(BatchLogRecordProcessor(exporter))
handler = LoggingHandler()
az_log_handler = handler
self.logger.addHandler(az_log_handler)
# add console logger if it is not already added by another instance of CustomLogger
if not any(isinstance(handler, logging.StreamHandler) for handler in self.logger.handlers):
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
self.logger.addHandler(console_handler)
def process(self, msg, kwargs):
"""
Extract conversation_id and include it in the log message
"""
extra = kwargs.get("extra") or self.extra
if extra.__contains__("custom_dimensions"):
custom_dimensions = extra["custom_dimensions"]
conversation_id = custom_dimensions["conversation_id"]
else:
conversation_id = "Conversation_id NOT SET while logging"
# include all properties except custom dimensions in extra dictionary to message
for key, value in extra.items():
if key != "custom_dimensions":
msg = msg + f", {key}: {value}"
return 'Conversation_id: %s, %s' % (conversation_id, msg), kwargs
def get_updated_properties(self, additional_custom_dimensions: dict, additional_properties: dict = {} ) -> dict:
"""
Add custom dimensions to the logger
"""
custom_dimensions = self.get_updated_custom_dimensions(additional_custom_dimensions)
properties = deepcopy(self.extra)
for key, value in additional_properties.items():
if key != "custom_dimensions":
properties[key] = value
properties["custom_dimensions"] = custom_dimensions
return properties
def get_updated_custom_dimensions(self, additional_custom_dimensions: dict) -> dict:
"""
Add custom dimensions to the logger
"""
dimensions = deepcopy(self.extra["custom_dimensions"])
for key, value in additional_custom_dimensions.items():
dimensions[key] = value
return dimensions
# CMW removed opencensus items
# added
azure-monitor-opentelemetry==1.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment