-
-
Save Karthikeya-Meesala/2bde7f87688d9ea8a2e930b862382a45 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
OPENAI_API_KEY="" | |
COMPOSIO_API_KEY="" | |
LANGFUSE_SECRET_KEY='' | |
LANGFUSE_PUBLIC_KEY='' | |
LANGFUSE_HOST='' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fastapi import FastAPI, HTTPException | |
from fastapi.testclient import TestClient | |
from langchain_core.tools import tool | |
from langchain_core.utils.function_calling import convert_to_openai_tool | |
from langchain.agents.format_scratchpad.openai_tools import format_to_openai_tool_messages | |
from langchain_openai import ChatOpenAI | |
from langchain_core.runnables.config import RunnableConfig | |
from langfuse import Langfuse | |
from langfuse.callback import CallbackHandler | |
from langserve import add_routes | |
from langchain.agents import AgentExecutor | |
from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser | |
from pydantic import BaseModel, Field | |
from langchain_core.prompts import MessagesPlaceholder | |
from composio_langchain import ComposioToolSet, App, Action | |
from langchain.prompts import ChatPromptTemplate | |
from typing import Optional, List | |
import os | |
from dotenv import load_dotenv | |
import time | |
load_dotenv() | |
app = FastAPI(title="My API", version="1.0.0") | |
# Initialize tools | |
composio_toolset = ComposioToolSet() | |
gmail_tools = composio_toolset.get_tools(actions=[Action.GMAIL_SEND_EMAIL]) | |
@tool | |
def word_length(word: str) -> int: | |
"""Returns the length of a word""" | |
return len(word) | |
# Combine all tools | |
tools = [word_length] + gmail_tools | |
print("Create integration in the dashboard") | |
# Initialize Langfuse | |
langfuse = Langfuse() | |
langfuse_handler = CallbackHandler() | |
# Initialize LLM with Langfuse | |
llm = ChatOpenAI() | |
config = RunnableConfig(callbacks=[langfuse_handler]) | |
llm_with_langfuse = llm.with_config(config) | |
# Define input model | |
class AgentInput(BaseModel): | |
input: str = Field(..., description="The input message for the agent") | |
entity: str = Field(..., description="The entity id for the agent") | |
integration_id: str = Field(..., description="The integration id for the agent") | |
class Config: | |
json_schema_extra = { | |
"examples": [ | |
{ | |
"input": "Send an email to karan.v@composio.dev saying hi", | |
"integration_id": "8cf69565-def0-4109-ad72-2051f522035e" | |
} | |
] | |
} | |
# Create prompt template | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", """You are a helpful assistant that can send emails and perform other tasks on gmail. | |
When sending emails, make sure to validate the email address format first."""), | |
("user", "{input}"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), | |
]) | |
# Initialize LLM with tools | |
llm_with_tools = llm.bind(tools=[convert_to_openai_tool(t) for t in tools]) | |
# Create agent | |
agent = ( | |
{ | |
"input": lambda x: x["input"], | |
"agent_scratchpad": lambda x: format_to_openai_tool_messages( | |
x["intermediate_steps"] | |
), | |
} | |
| prompt | |
| llm_with_tools | |
| OpenAIToolsAgentOutputParser() | |
) | |
# Create agent executor | |
agent_executor = AgentExecutor(agent=agent, tools=tools) | |
# Add Langfuse tracking to agent | |
agent_config = RunnableConfig( | |
{"run_name": "email_agent"}, | |
callbacks=[langfuse_handler] | |
) | |
@app.post("/test-agent/invoke") | |
async def create_item(agent_input: AgentInput): | |
try: | |
# Initiate the connection | |
connection_request = composio_toolset.initiate_connection( | |
entity_id=agent_input.entity, | |
app=App.GMAIL, | |
integration_id=agent_input.integration_id | |
) | |
time.sleep(2) | |
print("We are checking your connection, please wait...") | |
time.sleep(10) | |
if connection_request.connectionStatus == "INITIATED": | |
print(connection_request.redirectUrl) | |
connection_request.wait_until_active(timeout=120, client=composio_toolset.client) | |
# complete the connection by redirecting the user to the redirectUrl | |
print("Connection Status should be active after user completes the authentication process, you can now test by fetching the connection.") | |
# execute agent once authenticated. | |
result = await agent_executor.ainvoke( | |
{"input": agent_input.input}, | |
config=agent_config | |
) | |
return {"result": result} | |
elif connection_request.connectionStatus == "ACTIVE": | |
print("Connection Status is active, you can now test by calling the tool.") | |
# active connection means the user has completed the authentication process. | |
# the API Key entered might still be invalid, you can test by calling the tool. | |
# Execute the agent with Langfuse tracking | |
result = await agent_executor.ainvoke( | |
{"input": agent_input.input}, | |
config=agent_config | |
) | |
return {"result": result} | |
else: | |
print("Connection process failed, please try again.") | |
# Here you would typically save the item to a database | |
return AgentInput | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Create test client | |
client = TestClient(app) | |
def test_send_email(): | |
test_payload = { | |
"entity": "testing-langserve", | |
"input": "Send an email to karthikeya@composio.dev saying hi", | |
"integration_id": "7a9bdf20-34b1-4003-9f97-1705b973a995" | |
} | |
response = client.post("/test-agent/invoke", json=test_payload) | |
print(response.json()) | |
assert response.status_code == 200 | |
test_send_email() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment