Skip to content

Instantly share code, notes, and snippets.

@ericchaves
Last active January 12, 2026 22:04
Show Gist options
  • Select an option

  • Save ericchaves/2bec0713c3590d200a4de0d362ca0682 to your computer and use it in GitHub Desktop.

Select an option

Save ericchaves/2bec0713c3590d200a4de0d362ca0682 to your computer and use it in GitHub Desktop.
setup strands agent with two different service collectors: phoenix oss (agent and llm) and jaeger (application)
import boto3
import json
from typing import Dict, Any, Optional
from pydantic import ValidationError
# OpenTelemetry imports
from opentelemetry import trace
from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor
from opentelemetry.trace import Status, StatusCode
from openinference.semconv.trace import SpanAttributes
# Imports do agente
from strands import Agent
from strands.models.litellm import LiteLLMModel
from strands.agent.conversation_manager import SlidingWindowConversationManager
from strands.session.s3_session_manager import S3SessionManager
from agent_utils.prompts import carregar_prompt
from agent_utils.schemas import AgentOutput, UserInput
from .config import settings
from .schemas import TriagemOutput
from .otel import tracer, phoenix_tracer, agent_ctx
boto_session = boto3.Session(
region_name=settings.aws_default_region if settings.aws_default_region else None,
)
s3_client = boto_session.client("s3")
conversation_manager = SlidingWindowConversationManager(
window_size=settings.agents.conversation_window_size,
)
model = LiteLLMModel(
client_args={
"api_key": settings.agents.litellm_api_key,
"api_base": settings.agents.litellm_api_base_url,
"use_litellm_proxy": True
},
model_id=settings.agents.litellm_model
)
@tracer.start_as_current_span("lambda_handler")
def handler(event: Dict[str, Any], _context) -> Dict[str, Any]:
"""Lambda handler - invoca o agente"""
# 1. processa evento
span = trace.get_current_span()
span.add_event("fazendo parsing do evento")
try:
# Suporte híbrido: API Gateway (body string) ou Direct Invocation (dict raw)
if "body" in event and isinstance(event.get("body"), str):
body = json.loads(event["body"])
else:
body = event
user_input = UserInput(**body)
# Normaliza mensagem para string se for lista
if isinstance(user_input.mensagem, list):
message_str = "\n".join(user_input.mensagem)
else:
message_str = user_input.mensagem
except (json.JSONDecodeError, ValidationError) as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR))
return {
"statusCode": 400,
"body": json.dumps({"erro": "Input invalido", "detalhes": str(e)})
}
# 2. Fluxo Principal
try:
session_params = {
"session_id": user_input.session_id,
"bucket": settings.agents.session_bucket,
"prefix": f"{settings.agents.session_prefix}/agente-triagem",
}
span.add_event("configurando session manager", attributes=session_params)
# S3SessionManager é leve, ok instanciar por request
session_manager = S3SessionManager(
**session_params,
boto_session=boto_session,
)
# Prepara a versão: se for None, 0 ou "latest" nas settings, passamos None para pegar a última
versao_prompt = settings.agents.triagem_system_prompt_version
if versao_prompt in (None, 0, "latest", ""):
versao_prompt = None
system_prompt_params = {
"prompt_uri": settings.agents.triagem_system_prompt_uri,
"versao": versao_prompt,
}
# Remove None values for OTel attributes
span.add_event("carregando system_prompt", attributes={k: v for k, v in system_prompt_params.items() if v is not None})
system_prompt = carregar_prompt(**system_prompt_params)
span.add_event("configurando agente", attributes={
"model": settings.agents.litellm_model,
"conversation_window_size": settings.agents.conversation_window_size,
})
with phoenix_tracer.start_as_current_span("agent_handler", context=agent_ctx) as agent_span:
try:
agent_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, "agent")
agent_span.set_attribute(SpanAttributes.INPUT_VALUE, message_str)
agent_span.set_attribute("session.id", user_input.session_id)
agent_span.set_attribute("user.message", message_str)
# requerido pelo phoenix arize
agent_span.set_attribute("input.value", message_str)
agent_span.set_attribute("input.mime_type", "text/plain")
trace_attributes = {
"session.id": user_input.session_id,
"arize.tags": [
"Liftcred-Agents",
]
}
triagem_agent = Agent(
system_prompt=system_prompt,
model=model,
session_manager=session_manager,
conversation_manager=conversation_manager,
structured_output_model=TriagemOutput,
tools=[],
trace_attributes=trace_attributes,
)
# Executa o agente
result = triagem_agent(message_str)
if not result.structured_output:
span.add_event("erro_output_estruturado_ausente")
agent_span.add_event("erro_output_estruturado_ausente")
return {
"statusCode": 500,
"body": json.dumps({"erro": "Agente não retornou output estruturado"})
}
output_data = result.structured_output.model_dump()
agent_output = AgentOutput(
**output_data,
metadata={},
)
# requerido pelo phoenix arize
agent_span.set_status(Status(StatusCode.OK))
agent_span.set_attribute(SpanAttributes.OUTPUT_VALUE, agent_output.model_dump_json())
agent_span.set_attribute(SpanAttributes.OUTPUT_MIME_TYPE, "application/json")
except Exception as err:
agent_span.record_exception(err)
agent_span.set_status(trace.Status(trace.StatusCode.ERROR))
raise err
# Enriquecer metadata com métricas para avaliação
if result.metrics:
summary = result.metrics.get_summary()
agent_output.metadata.update(summary)
span.set_status(Status(StatusCode.OK))
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": agent_output.model_dump_json()
}
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
span.record_exception(e)
print(f"Erro Crítico: {e}") # OTel já capturou, mas print é útil para CloudWatch Logs simples
return {
"statusCode": 500,
"body": json.dumps({"erro": "Erro interno de processamento"})
}
# instrumentaliza lambda
AwsLambdaInstrumentor().instrument()
import os
from opentelemetry import trace, context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from strands.telemetry import StrandsTelemetry
from phoenix.otel import register
# Instrumentações Específicas
# from opentelemetry.instrumentation.redis import RedisInstrumentor
# from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
# from opentelemetry.instrumentation.requests import RequestsInstrumentor
from openinference.instrumentation.litellm import LiteLLMInstrumentor
# Phoenix tracer provider para LLM traces (LiteLLM, Strands)
# Usa PHOENIX_COLLECTOR_ENDPOINT e PHOENIX_GRPC_PORT para configurar o endpoint gRPC
phoenix_project_name = os.environ.get(
"PHOENIX_PROJECT_NAME",
os.environ.get("OTEL_SERVICE_NAME", "agente-triagem")
)
agent_ctx = context.Context()
phoenix_tracer_provider = register(
project_name=phoenix_project_name,
set_global_tracer_provider=False,
protocol="grpc",
batch=True,
)
phoenix_tracer = phoenix_tracer_provider.get_tracer(__name__)
strands_telemetry = StrandsTelemetry(tracer_provider=phoenix_tracer_provider)
strands_telemetry.setup_meter(enable_otlp_exporter=True)
LiteLLMInstrumentor().instrument(tracer_provider=phoenix_tracer_provider)
# App tracer provider para Grafana OTEL LGTM via OTLP HTTP
app_service_name = os.environ.get("OTEL_SERVICE_NAME", "agente-triagem")
app_resource = Resource.create({SERVICE_NAME: app_service_name})
app_provider = TracerProvider(resource=app_resource)
processor = BatchSpanProcessor(OTLPSpanExporter())
app_provider.add_span_processor(processor)
# Sets the global default tracer provider to App (Grafana OTEL LGTM)
trace.set_tracer_provider(app_provider)
tracer = trace.get_tracer(__name__)
# RedisInstrumentor.instrument(tracer_provider=tracer)
# RequestsInstrumentor().instrument(tracer_provider=tracer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment