-
-
Save ericchaves/2bec0713c3590d200a4de0d362ca0682 to your computer and use it in GitHub Desktop.
setup strands agent with two different service collectors: phoenix oss (agent and llm) and jaeger (application)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import boto3 | |
| import json | |
| from typing import Dict, Any, Optional | |
| from pydantic import ValidationError | |
| # OpenTelemetry imports | |
| from opentelemetry import trace | |
| from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor | |
| from opentelemetry.trace import Status, StatusCode | |
| from openinference.semconv.trace import SpanAttributes | |
| # Imports do agente | |
| from strands import Agent | |
| from strands.models.litellm import LiteLLMModel | |
| from strands.agent.conversation_manager import SlidingWindowConversationManager | |
| from strands.session.s3_session_manager import S3SessionManager | |
| from agent_utils.prompts import carregar_prompt | |
| from agent_utils.schemas import AgentOutput, UserInput | |
| from .config import settings | |
| from .schemas import TriagemOutput | |
| from .otel import tracer, phoenix_tracer, agent_ctx | |
| boto_session = boto3.Session( | |
| region_name=settings.aws_default_region if settings.aws_default_region else None, | |
| ) | |
| s3_client = boto_session.client("s3") | |
| conversation_manager = SlidingWindowConversationManager( | |
| window_size=settings.agents.conversation_window_size, | |
| ) | |
| model = LiteLLMModel( | |
| client_args={ | |
| "api_key": settings.agents.litellm_api_key, | |
| "api_base": settings.agents.litellm_api_base_url, | |
| "use_litellm_proxy": True | |
| }, | |
| model_id=settings.agents.litellm_model | |
| ) | |
| @tracer.start_as_current_span("lambda_handler") | |
| def handler(event: Dict[str, Any], _context) -> Dict[str, Any]: | |
| """Lambda handler - invoca o agente""" | |
| # 1. processa evento | |
| span = trace.get_current_span() | |
| span.add_event("fazendo parsing do evento") | |
| try: | |
| # Suporte híbrido: API Gateway (body string) ou Direct Invocation (dict raw) | |
| if "body" in event and isinstance(event.get("body"), str): | |
| body = json.loads(event["body"]) | |
| else: | |
| body = event | |
| user_input = UserInput(**body) | |
| # Normaliza mensagem para string se for lista | |
| if isinstance(user_input.mensagem, list): | |
| message_str = "\n".join(user_input.mensagem) | |
| else: | |
| message_str = user_input.mensagem | |
| except (json.JSONDecodeError, ValidationError) as e: | |
| span.record_exception(e) | |
| span.set_status(trace.Status(trace.StatusCode.ERROR)) | |
| return { | |
| "statusCode": 400, | |
| "body": json.dumps({"erro": "Input invalido", "detalhes": str(e)}) | |
| } | |
| # 2. Fluxo Principal | |
| try: | |
| session_params = { | |
| "session_id": user_input.session_id, | |
| "bucket": settings.agents.session_bucket, | |
| "prefix": f"{settings.agents.session_prefix}/agente-triagem", | |
| } | |
| span.add_event("configurando session manager", attributes=session_params) | |
| # S3SessionManager é leve, ok instanciar por request | |
| session_manager = S3SessionManager( | |
| **session_params, | |
| boto_session=boto_session, | |
| ) | |
| # Prepara a versão: se for None, 0 ou "latest" nas settings, passamos None para pegar a última | |
| versao_prompt = settings.agents.triagem_system_prompt_version | |
| if versao_prompt in (None, 0, "latest", ""): | |
| versao_prompt = None | |
| system_prompt_params = { | |
| "prompt_uri": settings.agents.triagem_system_prompt_uri, | |
| "versao": versao_prompt, | |
| } | |
| # Remove None values for OTel attributes | |
| span.add_event("carregando system_prompt", attributes={k: v for k, v in system_prompt_params.items() if v is not None}) | |
| system_prompt = carregar_prompt(**system_prompt_params) | |
| span.add_event("configurando agente", attributes={ | |
| "model": settings.agents.litellm_model, | |
| "conversation_window_size": settings.agents.conversation_window_size, | |
| }) | |
| with phoenix_tracer.start_as_current_span("agent_handler", context=agent_ctx) as agent_span: | |
| try: | |
| agent_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, "agent") | |
| agent_span.set_attribute(SpanAttributes.INPUT_VALUE, message_str) | |
| agent_span.set_attribute("session.id", user_input.session_id) | |
| agent_span.set_attribute("user.message", message_str) | |
| # requerido pelo phoenix arize | |
| agent_span.set_attribute("input.value", message_str) | |
| agent_span.set_attribute("input.mime_type", "text/plain") | |
| trace_attributes = { | |
| "session.id": user_input.session_id, | |
| "arize.tags": [ | |
| "Liftcred-Agents", | |
| ] | |
| } | |
| triagem_agent = Agent( | |
| system_prompt=system_prompt, | |
| model=model, | |
| session_manager=session_manager, | |
| conversation_manager=conversation_manager, | |
| structured_output_model=TriagemOutput, | |
| tools=[], | |
| trace_attributes=trace_attributes, | |
| ) | |
| # Executa o agente | |
| result = triagem_agent(message_str) | |
| if not result.structured_output: | |
| span.add_event("erro_output_estruturado_ausente") | |
| agent_span.add_event("erro_output_estruturado_ausente") | |
| return { | |
| "statusCode": 500, | |
| "body": json.dumps({"erro": "Agente não retornou output estruturado"}) | |
| } | |
| output_data = result.structured_output.model_dump() | |
| agent_output = AgentOutput( | |
| **output_data, | |
| metadata={}, | |
| ) | |
| # requerido pelo phoenix arize | |
| agent_span.set_status(Status(StatusCode.OK)) | |
| agent_span.set_attribute(SpanAttributes.OUTPUT_VALUE, agent_output.model_dump_json()) | |
| agent_span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, "application/json") | |
| except Exception as err: | |
| agent_span.record_exception(err) | |
| agent_span.set_status(trace.Status(trace.StatusCode.ERROR)) | |
| raise err | |
| # Enriquecer metadata com métricas para avaliação | |
| if result.metrics: | |
| summary = result.metrics.get_summary() | |
| agent_output.metadata.update(summary) | |
| span.set_status(Status(StatusCode.OK)) | |
| return { | |
| "statusCode": 200, | |
| "headers": {"Content-Type": "application/json"}, | |
| "body": agent_output.model_dump_json() | |
| } | |
| except Exception as e: | |
| span.set_status(trace.Status(trace.StatusCode.ERROR, str(e))) | |
| span.record_exception(e) | |
| print(f"Erro Crítico: {e}") # OTel já capturou, mas print é útil para CloudWatch Logs simples | |
| return { | |
| "statusCode": 500, | |
| "body": json.dumps({"erro": "Erro interno de processamento"}) | |
| } | |
| # instrumentaliza lambda | |
| AwsLambdaInstrumentor().instrument() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| from opentelemetry import trace, context | |
| from opentelemetry.sdk.trace import TracerProvider | |
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | |
| from opentelemetry.sdk.resources import Resource, SERVICE_NAME | |
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
| from strands.telemetry import StrandsTelemetry | |
| from phoenix.otel import register | |
| # Instrumentações Específicas | |
| # from opentelemetry.instrumentation.redis import RedisInstrumentor | |
| # from opentelemetry.instrumentation.botocore import BotocoreInstrumentor | |
| # from opentelemetry.instrumentation.requests import RequestsInstrumentor | |
| from openinference.instrumentation.litellm import LiteLLMInstrumentor | |
| # Phoenix tracer provider para LLM traces (LiteLLM, Strands) | |
| # Usa PHOENIX_COLLECTOR_ENDPOINT e PHOENIX_GRPC_PORT para configurar o endpoint gRPC | |
| phoenix_project_name = os.environ.get( | |
| "PHOENIX_PROJECT_NAME", | |
| os.environ.get("OTEL_SERVICE_NAME", "agente-triagem") | |
| ) | |
| agent_ctx = context.Context() | |
| phoenix_tracer_provider = register( | |
| project_name=phoenix_project_name, | |
| set_global_tracer_provider=False, | |
| protocol="grpc", | |
| batch=True, | |
| ) | |
| phoenix_tracer = phoenix_tracer_provider.get_tracer(__name__) | |
| strands_telemetry = StrandsTelemetry(tracer_provider=phoenix_tracer_provider) | |
| strands_telemetry.setup_meter(enable_otlp_exporter=True) | |
| LiteLLMInstrumentor().instrument(tracer_provider=phoenix_tracer_provider) | |
| # App tracer provider para Grafana OTEL LGTM via OTLP HTTP | |
| app_service_name = os.environ.get("OTEL_SERVICE_NAME", "agente-triagem") | |
| app_resource = Resource.create({SERVICE_NAME: app_service_name}) | |
| app_provider = TracerProvider(resource=app_resource) | |
| processor = BatchSpanProcessor(OTLPSpanExporter()) | |
| app_provider.add_span_processor(processor) | |
| # Sets the global default tracer provider to App (Grafana OTEL LGTM) | |
| trace.set_tracer_provider(app_provider) | |
| tracer = trace.get_tracer(__name__) | |
| # RedisInstrumentor.instrument(tracer_provider=tracer) | |
| # RequestsInstrumentor().instrument(tracer_provider=tracer) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment