Skip to content

Instantly share code, notes, and snippets.

@bonadio
Last active April 25, 2024 19:45
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save bonadio/2d548a493907c133bc10de806ecd08af to your computer and use it in GitHub Desktop.
Save bonadio/2d548a493907c133bc10de806ecd08af to your computer and use it in GitHub Desktop.
Very basic implementation of Autogen with FastApi using websocket to interact with user_proxy in a web app
import autogen
from user_proxy_webagent import UserProxyWebAgent
import asyncio
config_list = [
{
"model": "gpt-3.5-turbo",
# "api_key": "<YOUR KEY HERE>"
}
]
llm_config_assistant = {
"model":"gpt-3.5-turbo",
"temperature": 0,
"config_list": config_list,
"functions": [
{
"name": "search_db",
"description": "Search database for order status",
"parameters": {
"type": "object",
"properties": {
"order_number": {
"type": "integer",
"description": "Order number",
},
"customer_number": {
"type": "string",
"description": "Customer number",
}
},
"required": ["order_number","customer_number"],
},
},
],
}
llm_config_proxy = {
"model":"gpt-3.5-turbo-0613",
"temperature": 0,
"config_list": config_list,
}
#############################################################################################
# this is where you put your Autogen logic, here I have a simple 2 agents with a function call
class AutogenChat():
def __init__(self, chat_id=None, websocket=None):
self.websocket = websocket
self.chat_id = chat_id
self.client_sent_queue = asyncio.Queue()
self.client_receive_queue = asyncio.Queue()
self.assistant = autogen.AssistantAgent(
name="assistant",
llm_config=llm_config_assistant,
system_message="""You are a helpful assistant, help the user find the status of his order.
Only use the tools provided to do the search. Only execute the search after you have all the information needed.
When you ask a question, always add the word "BRKT"" at the end.
When you responde with the status add the word TERMINATE"""
)
self.user_proxy = UserProxyWebAgent(
name="user_proxy",
human_input_mode="ALWAYS",
max_consecutive_auto_reply=10,
is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
code_execution_config=False,
function_map={
"search_db": self.search_db
}
)
# add the queues to communicate
self.user_proxy.set_queues(self.client_sent_queue, self.client_receive_queue)
async def start(self, message):
await self.user_proxy.a_initiate_chat(
self.assistant,
clear_history=True,
message=message
)
#MOCH Function call
def search_db(self, order_number=None, customer_number=None):
return "Order status: delivered TERMINATE"
# to test this code
# add your openai key to the config_list on autogen_chat.py
# run the following code in the same directory
# python main.py
# access http://localhost:8000
# send the following message:
# send -> What is the status of my order?
# send -> order 111
# send -> customer 222
# the response should be Delivered
# send -> exit to end
# CTRL+C terminate the process
from fastapi import FastAPI, WebSocket, Request
from fastapi.responses import HTMLResponse
import uuid
from autogen_chat import AutogenChat
import asyncio
import uvicorn
from dotenv import load_dotenv, find_dotenv
import openai
import os
_ = load_dotenv(find_dotenv()) # read local .env file
openai.api_key = os.environ['OPENAI_API_KEY']
# openai.log='debug'
app = FastAPI()
app.autogen_chat = {}
@app.get("/")
async def get(request: Request):
chat_id = str(uuid.uuid1())
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<h2>Your ID: <span id="ws-id"></span></h2>
<form action="" onsubmit="sendMessage(event)">
<input type="text" id="messageText" autocomplete="off"/>
<button>Send</button>
</form>
<ul id='messages'>
</ul>
<script>
function showMessage(msg) {{
var messages = document.getElementById('messages')
var message = document.createElement('li')
var content = document.createTextNode(msg)
message.appendChild(content)
messages.appendChild(message)
}};
var chat_id = "{chat_id}"
document.querySelector("#ws-id").textContent = chat_id;
var ws = new WebSocket("ws://localhost:8000/ws/{chat_id}");
ws.onmessage = function(event) {{
showMessage(event.data)
}};
function sendMessage(event) {{
var input = document.getElementById("messageText")
ws.send(input.value)
showMessage(input.value)
input.value = ''
event.preventDefault()
}}
</script>
</body>
</html>
"""
return HTMLResponse(html)
class ConnectionManager:
def __init__(self):
self.active_connections: list[AutogenChat] = []
async def connect(self, autogen_chat: AutogenChat):
await autogen_chat.websocket.accept()
self.active_connections.append(autogen_chat)
async def disconnect(self, autogen_chat: AutogenChat):
autogen_chat.client_receive_queue.put_nowait("DO_FINISH")
print(f"autogen_chat {autogen_chat.chat_id} disconnected")
self.active_connections.remove(autogen_chat)
manager = ConnectionManager()
async def send_to_client(autogen_chat: AutogenChat):
while True:
reply = await autogen_chat.client_receive_queue.get()
if reply and reply == "DO_FINISH":
autogen_chat.client_receive_queue.task_done()
break
await autogen_chat.websocket.send_text(reply)
autogen_chat.client_receive_queue.task_done()
await asyncio.sleep(0.05)
async def receive_from_client(autogen_chat: AutogenChat):
while True:
data = await autogen_chat.websocket.receive_text()
if data and data == "DO_FINISH":
await autogen_chat.client_receive_queue.put("DO_FINISH")
await autogen_chat.client_sent_queue.put("DO_FINISH")
break
await autogen_chat.client_sent_queue.put(data)
await asyncio.sleep(0.05)
@app.websocket("/ws/{chat_id}")
async def websocket_endpoint(websocket: WebSocket, chat_id: str):
try:
autogen_chat = AutogenChat(chat_id=chat_id, websocket=websocket)
await manager.connect(autogen_chat)
data = await autogen_chat.websocket.receive_text()
future_calls = asyncio.gather(send_to_client(autogen_chat), receive_from_client(autogen_chat))
await autogen_chat.start(data)
print("DO_FINISHED")
except Exception as e:
print("ERROR", str(e))
finally:
try:
await manager.disconnect(autogen_chat)
except:
pass
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
import autogen
from autogen import Agent, ConversableAgent
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
try:
from termcolor import colored
except ImportError:
def colored(x, *args, **kwargs):
return x
class UserProxyWebAgent(autogen.UserProxyAgent):
def __init__(self, *args, **kwargs):
super(UserProxyWebAgent, self).__init__(*args, **kwargs)
self._reply_func_list = []
self.register_reply([Agent, None], ConversableAgent.generate_oai_reply)
self.register_reply([Agent, None], ConversableAgent.generate_code_execution_reply)
self.register_reply([Agent, None], ConversableAgent.generate_function_call_reply)
self.register_reply([Agent, None], UserProxyWebAgent.a_check_termination_and_human_reply)
async def a_check_termination_and_human_reply(
self,
messages: Optional[List[Dict]] = None,
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
"""Check if the conversation should be terminated, and if human reply is provided."""
if config is None:
config = self
if messages is None:
messages = self._oai_messages[sender]
message = messages[-1]
reply = ""
no_human_input_msg = ""
if self.human_input_mode == "ALWAYS":
reply = await self.a_get_human_input(
f"Provide feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to end the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not self._is_termination_msg(message) else "exit"
else:
if self._consecutive_auto_reply_counter[sender] >= self._max_consecutive_auto_reply_dict[sender]:
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
terminate = self._is_termination_msg(message)
reply = await self.a_get_human_input(
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
if terminate
else f"Please give feedback to {sender.name}. Press enter to skip and use auto-reply, or type 'exit' to stop the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply if reply or not terminate else "exit"
elif self._is_termination_msg(message):
if self.human_input_mode == "NEVER":
reply = "exit"
else:
# self.human_input_mode == "TERMINATE":
reply = await self.a_get_human_input(
f"Please give feedback to {sender.name}. Press enter or type 'exit' to stop the conversation: "
)
no_human_input_msg = "NO HUMAN INPUT RECEIVED." if not reply else ""
# if the human input is empty, and the message is a termination message, then we will terminate the conversation
reply = reply or "exit"
# print the no_human_input_msg
if no_human_input_msg:
print(colored(f"\n>>>>>>>> {no_human_input_msg}", "red"), flush=True)
# stop the conversation
if reply == "exit":
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, None
# send the human reply
if reply or self._max_consecutive_auto_reply_dict[sender] == 0:
# reset the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] = 0
return True, reply
# increment the consecutive_auto_reply_counter
self._consecutive_auto_reply_counter[sender] += 1
if self.human_input_mode != "NEVER":
print(colored("\n>>>>>>>> USING AUTO REPLY...", "red"), flush=True)
return False, None
def set_queues(self, client_sent_queue, client_receive_queue):
self.client_sent_queue = client_sent_queue
self.client_receive_queue = client_receive_queue
async def a_get_human_input(self, prompt: str) -> str:
last_message = self.last_message()
if last_message["content"]:
await self.client_receive_queue.put(last_message["content"])
reply = await self.client_sent_queue.get()
if reply and reply == "DO_FINISH":
return "exit"
return reply
else:
return
@bonadio
Copy link
Author

bonadio commented Oct 27, 2023

This is a new version that uses only asyncio and no threads

@bonadio
Copy link
Author

bonadio commented Oct 27, 2023

Check the full demo

Autogen with FastApi backend and React frontend
https://github.com/bonadio/autogenwebdemo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment