Skip to content

Instantly share code, notes, and snippets.

@Karthikeya-Meesala
Last active November 7, 2024 18:58
Show Gist options
  • Save Karthikeya-Meesala/2bde7f87688d9ea8a2e930b862382a45 to your computer and use it in GitHub Desktop.
Save Karthikeya-Meesala/2bde7f87688d9ea8a2e930b862382a45 to your computer and use it in GitHub Desktop.
OPENAI_API_KEY=""
COMPOSIO_API_KEY=""
LANGFUSE_SECRET_KEY=''
LANGFUSE_PUBLIC_KEY=''
LANGFUSE_HOST=''
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from langchain_core.tools import tool
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages
from langchain_openai import ChatOpenAI
from langchain_core.runnables.config import RunnableConfig
from langfuse import Langfuse
from langfuse.callback import CallbackHandler
from langserve import add_routes
from langchain.agents import AgentExecutor
from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser
from pydantic import BaseModel, Field
from langchain_core.prompts import MessagesPlaceholder
from composio_langchain import ComposioToolSet, App, Action
from langchain.prompts import ChatPromptTemplate
from typing import Optional, List
import os
from dotenv import load_dotenv
import time
load_dotenv()
app = FastAPI(title="My API", version="1.0.0")
# Initialize tools
composio_toolset = ComposioToolSet()
gmail_tools = composio_toolset.get_tools(actions=[Action.GMAIL_SEND_EMAIL])
@tool
def word_length(word: str) -> int:
"""Returns the length of a word"""
return len(word)
# Combine all tools
tools = [word_length] + gmail_tools
print("Create integration in the dashboard")
# Initialize Langfuse
langfuse = Langfuse()
langfuse_handler = CallbackHandler()
# Initialize LLM with Langfuse
llm = ChatOpenAI()
config = RunnableConfig(callbacks=[langfuse_handler])
llm_with_langfuse = llm.with_config(config)
# Define input model
class AgentInput(BaseModel):
input: str = Field(..., description="The input message for the agent")
entity: str = Field(..., description="The entity id for the agent")
integration_id: str = Field(..., description="The integration id for the agent")
class Config:
json_schema_extra = {
"examples": [
{
"input": "Send an email to karan.v@composio.dev saying hi",
"integration_id": "8cf69565-def0-4109-ad72-2051f522035e"
}
]
}
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
("system", """You are a helpful assistant that can send emails and perform other tasks on gmail.
When sending emails, make sure to validate the email address format first."""),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
# Initialize LLM with tools
llm_with_tools = llm.bind(tools=[convert_to_openai_tool(t) for t in tools])
# Create agent
agent = (
{
"input": lambda x: x["input"],
"agent_scratchpad": lambda x: format_to_openai_tool_messages(
x["intermediate_steps"]
),
}
| prompt
| llm_with_tools
| OpenAIToolsAgentOutputParser()
)
# Create agent executor
agent_executor = AgentExecutor(agent=agent, tools=tools)
# Add Langfuse tracking to agent
agent_config = RunnableConfig(
{"run_name": "email_agent"},
callbacks=[langfuse_handler]
)
@app.post("/test-agent/invoke")
async def create_item(agent_input: AgentInput):
try:
# Initiate the connection
connection_request = composio_toolset.initiate_connection(
entity_id=agent_input.entity,
app=App.GMAIL,
integration_id=agent_input.integration_id
)
time.sleep(2)
print("We are checking your connection, please wait...")
time.sleep(10)
if connection_request.connectionStatus == "INITIATED":
print(connection_request.redirectUrl)
connection_request.wait_until_active(timeout=120, client=composio_toolset.client)
# complete the connection by redirecting the user to the redirectUrl
print("Connection Status should be active after user completes the authentication process, you can now test by fetching the connection.")
# execute agent once authenticated.
result = await agent_executor.ainvoke(
{"input": agent_input.input},
config=agent_config
)
return {"result": result}
elif connection_request.connectionStatus == "ACTIVE":
print("Connection Status is active, you can now test by calling the tool.")
# active connection means the user has completed the authentication process.
# the API Key entered might still be invalid, you can test by calling the tool.
# Execute the agent with Langfuse tracking
result = await agent_executor.ainvoke(
{"input": agent_input.input},
config=agent_config
)
return {"result": result}
else:
print("Connection process failed, please try again.")
# Here you would typically save the item to a database
return AgentInput
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Create test client
client = TestClient(app)
def test_send_email():
test_payload = {
"entity": "testing-langserve",
"input": "Send an email to karthikeya@composio.dev saying hi",
"integration_id": "7a9bdf20-34b1-4003-9f97-1705b973a995"
}
response = client.post("/test-agent/invoke", json=test_payload)
print(response.json())
assert response.status_code == 200
test_send_email()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment