Skip to content

Instantly share code, notes, and snippets.

@Shulyaka
Created January 13, 2024 09:37
Show Gist options
  • Save Shulyaka/81284320c4f3999c176f9145aae9cd8d to your computer and use it in GitHub Desktop.
Save Shulyaka/81284320c4f3999c176f9145aae9cd8d to your computer and use it in GitHub Desktop.
Interaction of two AI agents
#!/usr/bin/env python3
"""
This is an example of a team work of two OpenAI Assistants. They have been given a task to review a test of a new employee candidate. The 'Reviewer' (aka Junior hiring manager) is given the instructions to review the test, and the 'Supervisor' (aka Senior hiring manager) is given the instructions to guide the Reviewer throug the process, to ask him to review section by section, and to check his work. User and Reviewer only talk to Supervisor and not to each other.
As of Jan 2024, it is not perfect and the AI Agents do make mistakes and still require human intervention to complete the task correctly. Howewer this approach significantly reduces the number of interventions required and provides better results because of self-check. The downside is the number of tokens required is several times greater than with a single agent approach.
The instructions are very custom and thus not included. But here is an example:
```supervisor_instructions.md
You are a senior hiring manager of a tech company. Your task is to supervise the work of a junior hiring manager. His task is to review the candidate's answers and rate each answer from 0 to 5 and provide a summarized feedback with total score for each section out of the max score.
The junior hiring manager sometimes makes errors. Sometimes he can't find the answers to the question when in fact it was provided by the candidate, sometimes he rates a wrong question, sometimes he only review part of the section he is asked to review and occasionally make other errors. Your task is to guide him to review all sections of the test one by one and then also ask him to provide a short summary as his final report. Don't stop until the final report is provied or an unrecoverable state detected. Check his response for mistakes and double check his findings. Check if all answers are reviewed. If he says the candidate did not provide the response for a question, double check yourself if the answer is really missing. If the junior hiring manager makes a mistake, ask him to correct it and guide him step-by-step how to do it until he does or until you verify there was no error. Quote the candidate's answer if needed. Don't rate the answers yourself, only verify the work of the junior hiring manager.
Use the appropriate function to talk to the junior hiring manager instead of asking the user.
He is given the same file with the candidate's answers as you.
```
```reviewer_instructions.md
You are a hiring manager of a tech company. Your task is to review the candidate's answers and rate each answer from 0 to 5 and provide a summarized feedback with total score for each section out of the max score. Rate each question independently and honestly. You do not need to insert references to the document. If you find a sign of cheating, please include it into the feedback.
When in doubt, or you find that some background information is missing, you can ask for help from the user, who is your supervisor and a senior hiring manager, but don't take his opinion for granted and verify what he tells you. Be careful to not mistake the answer with part of the question.
<<< job description >>>
Please start by reviwing each section one by one, and once finished, provide a short summary as the final report: candidate name, the results of each section, the strongest points of the candidate, the weakest points of the candidate, and whether we should invite him for an online interview.
Here are some notes on the questions, use them during the review:
<<< Some guides on how to assess each question >>>
```
"""
import logging
from time import sleep
from json import dumps as json_dumps, loads as json_loads
from pathlib import Path
import argparse
import openai
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
LOGGER.setLevel(logging.DEBUG)
model = "gpt-4-1106-preview"
supervisor_agent_name = "GPT TQ Reviewer Supervisor"
supervisor_message = "Please find the test of a new candidate. Please ask the Reviewer to review it and then obtain the final report."
reviewer_agent_name = "GPT TQ Reviewer"
reviewer_message = "Please find the test of a new candidate."
class RateLimitExceeded(RuntimeError):
pass
class ServerError(RuntimeError):
pass
RESET = "\x1b[0m"
BOLD = "\x1b[1m"
GREEN = "\x1b[32;20m"
GREEN_BOLD = "\x1b[32;1m"
BLUE = "\x1b[34;20m"
PURPLE = "\x1b[35;20m"
LBLUE = "\x1b[36;20m"
LBLUE_BOLD = "\x1b[36;1m"
parser = argparse.ArgumentParser()
parser.add_argument(
"--filename",
help="PDF file to review",
)
parser.add_argument(
"--continue_last_session",
type=bool,
help="Continue last session",
)
parser.add_argument(
"--supervisor_agent_id",
help="Supervisor assistant ID",
)
parser.add_argument(
"--reviewer_agent_id",
help="Reviewer assistant ID",
)
parser.add_argument(
"--supervisor_thread_id",
help="Supervisor thread ID",
)
parser.add_argument(
"--reviewer_thread_id",
help="Reviewer thread ID",
)
args = parser.parse_args()
reviewer_agent_id = args.reviewer_agent_id
supervisor_agent_id = args.supervisor_agent_id
supervisor_thread_id = args.supervisor_thread_id
reviewer_thread_id = args.reviewer_thread_id
client = openai.OpenAI()
# Get or create Supervisor agent
supervisor_agent = None
if supervisor_agent_id is None:
assistants = client.beta.assistants.list()
while supervisor_agent_id is None:
for assistant in assistants:
if assistant.name == supervisor_agent_name:
supervisor_agent = assistant
supervisor_agent_id = assistant.id
break
if assistants.has_next_page():
assistants = assistants.get_next_page()
else:
break
if supervisor_agent_id is not None and supervisor_agent is None:
try:
supervisor_agent = client.beta.assistants.retrieve(
assistant_id=supervisor_agent_id
)
except Exception as e:
LOGGER.warning(str(e))
supervisor_agent_id = None
if supervisor_agent_id is None:
supervisor_agent = client.beta.assistants.create(
model=model,
instructions=Path("supervisor_instructions.md").read_text(),
name=supervisor_agent_name,
tools=[
{"type": "retrieval"},
{
"type": "function",
"function": {
"name": "send_message",
"parameters": {
"type": "object",
"properties": {
"message": {
"type": "string",
"description": "The message text to send",
}
},
"required": ["message"],
},
"description": "Use this function to communicate to the junior hiring manager, write him a message and get a response",
},
},
],
)
supervisor_agent_id = supervisor_agent.id
# Get or create Reviewer agent
reviewer_agent = None
if reviewer_agent_id is None:
reviewer_agent_id = supervisor_agent.metadata.get("reviewer_agent_id")
if reviewer_agent_id is not None and reviewer_agent is None:
try:
reviewer_agent = client.beta.assistants.retrieve(assistant_id=reviewer_agent_id)
except Exception as e:
LOGGER.warning(str(e))
reviewer_agent_id = None
if reviewer_agent_id is None:
assistants = client.beta.assistants.list()
while reviewer_agent_id is None:
for assistant in assistants:
if assistant.name == reviewer_agent_name:
reviewer_agent = assistant
reviewer_agent_id = assistant.id
break
if assistants.has_next_page():
assistants = assistants.get_next_page()
else:
break
if reviewer_agent_id is not None and reviewer_agent is None:
try:
reviewer_agent = client.beta.assistants.retrieve(assistant_id=reviewer_agent_id)
except Exception as e:
LOGGER.warning(str(e))
reviewer_agent_id = None
if reviewer_agent_id is None:
reviewer_agent = client.beta.assistants.create(
model=model,
instructions=Path("reviewer_instructions.md").read_text(),
name=reviewer_agent_name,
tools=[{"type": "retrieval"}],
)
reviewer_agent_id = reviewer_agent.id
metadata = supervisor_agent.metadata
if metadata.get("reviewer_agent_id") != reviewer_agent_id:
metadata.update({"reviewer_agent_id": reviewer_agent_id})
supervisor_agent = client.beta.assistants.update(
assistant_id=supervisor_agent_id, metadata=metadata
)
# Get last supervisor thread
supervisor_thread = None
if supervisor_thread_id is None:
supervisor_thread_id = supervisor_agent.metadata.get("supervisor_thread_id")
if supervisor_thread_id is not None:
try:
supervisor_thread = client.beta.threads.retrieve(thread_id=supervisor_thread_id)
except Exception as e:
LOGGER.warning(str(e))
supervisor_thread_id = None
# Get last reviewer thread
reviewer_thread = None
if reviewer_thread_id is None and supervisor_thread is not None:
reviewer_thread_id = supervisor_thread.metadata.get("reviewer_thread_id")
if reviewer_thread_id is not None:
try:
reviewer_thread = client.beta.threads.retrieve(thread_id=reviewer_thread_id)
except Exception as e:
LOGGER.warning(str(e))
supervisor_thread_id = None
# Create new threads
if args.continue_last_session:
if supervisor_thread is None or reviewer_thread is None:
raise RuntimeError("Cannot find the thread ID to continue the last session")
else:
file_id = None
if supervisor_thread is not None:
file_id = supervisor_thread.metadata.get("file_id")
if file_id is not None:
try:
client.files.delete(file_id=file_id)
except Exception:
pass
client.beta.threads.delete(thread_id=supervisor_thread.id)
if reviewer_thread is not None:
client.beta.threads.delete(thread_id=reviewer_thread.id)
file = client.files.create(file=open(args.filename, "rb"), purpose="assistants")
reviewer_thread = client.beta.threads.create(
messages=[
openai.types.beta.thread_create_params.Message(
content=reviewer_message,
role="user",
file_ids=[file.id],
)
]
)
supervisor_thread = client.beta.threads.create(
messages=[
openai.types.beta.thread_create_params.Message(
content=supervisor_message,
role="user",
file_ids=[file.id],
)
],
metadata={"reviewer_thread_id": reviewer_thread.id, "file_id": file.id},
)
metadata = supervisor_agent.metadata
metadata.update({"supervisor_thread_id": supervisor_thread.id})
supervisor_agent = client.beta.assistants.update(
assistant_id=supervisor_agent_id, metadata=metadata
)
LOGGER.debug(
"Supervisor debug URL: https://platform.openai.com/playground?assistant=%s&mode=assistant&thread=%s",
supervisor_agent_id,
supervisor_thread.id,
)
LOGGER.debug(
"Reviewer debug URL: https://platform.openai.com/playground?assistant=%s&mode=assistant&thread=%s",
reviewer_agent_id,
reviewer_thread.id,
)
if args.continue_last_session:
messages = client.beta.assistants.list(thread_id=supervisor_thread.id)
while True:
for message in messages:
LOGGER.info(message)
if messages.has_next_page():
messages = messages.get_next_page()
else:
break
else:
LOGGER.info(
"User to Reviewer:\nPlease find the test of a new candidate. [File ID %s]",
file.id,
)
LOGGER.info(
"User to Supervisor:\nPlease find the test of a new candidate. Please ask the junior hiring manager to review it and then obtain the final report. [File ID %s]",
file.id,
)
# Run the supervisor thread
supervisor_run = client.beta.threads.runs.create(
thread_id=supervisor_thread.id,
assistant_id=supervisor_agent_id,
)
while True:
supervisor_logged_steps = []
while True:
supervisor_run = client.beta.threads.runs.retrieve(
thread_id=supervisor_thread.id, run_id=supervisor_run.id
)
steps = [
step
for step in client.beta.threads.runs.steps.list(
thread_id=supervisor_thread.id, run_id=supervisor_run.id
)
if step.status == "completed" and step.id not in supervisor_logged_steps
]
supervisor_logged_steps.extend([step.id for step in steps])
for step in steps:
if step.type == "message_creation":
message = client.beta.threads.messages.retrieve(
thread_id=supervisor_thread.id,
message_id=step.step_details.message_creation.message_id,
)
if (
message.role == "assistant"
and message.content[0].type == "text"
and message.content[0].text.value
):
LOGGER.info(
"Supervisor to User:\n%s%s%s",
GREEN_BOLD,
message.content[0].text.value,
RESET,
)
elif step.type == "tool_calls":
for tool_call in step.step_details.tool_calls:
if (
tool_call["type"]
if isinstance(tool_call, dict)
else tool_call.type
) == "retrieval":
LOGGER.info(
"%s*Supervisor is checking the file*%s", PURPLE, RESET
)
if supervisor_run.status in ("queued", "in_progress"):
sleep(3)
continue
elif (
supervisor_run.status == "requires_action"
and supervisor_run.required_action.type == "submit_tool_outputs"
):
tool_outputs = []
for (
supervisor_tool_call
) in supervisor_run.required_action.submit_tool_outputs.tool_calls:
assert supervisor_tool_call.type == "function"
assert supervisor_tool_call.function.name == "send_message"
# Supervisor sends message to Reviewer, run the reviewer thread
message = json_loads(supervisor_tool_call.function.arguments)["message"]
LOGGER.info("Supervisor to Reviewer:\n%s%s%s", LBLUE, message, RESET)
client.beta.threads.messages.create(
thread_id=reviewer_thread.id,
role="user",
content=message,
)
reviewer_run = client.beta.threads.runs.create(
thread_id=reviewer_thread.id,
assistant_id=reviewer_agent_id,
)
reviewer_logged_steps = []
messages = []
while reviewer_run.status in ("queued", "in_progress"):
sleep(3)
reviewer_run = client.beta.threads.runs.retrieve(
thread_id=reviewer_thread.id,
run_id=reviewer_run.id,
)
steps = [
step
for step in client.beta.threads.runs.steps.list(
thread_id=reviewer_thread.id, run_id=reviewer_run.id
)
if step.status == "completed"
and step.id not in reviewer_logged_steps
]
reviewer_logged_steps.extend([step.id for step in steps])
for step in steps:
if step.type == "message_creation":
message = client.beta.threads.messages.retrieve(
thread_id=reviewer_thread.id,
message_id=step.step_details.message_creation.message_id,
)
if (
message.role == "assistant"
and message.content[0].type == "text"
and message.content[0].text.value
):
LOGGER.info(
"Reviewer to Supervisor:\n%s%s%s",
LBLUE_BOLD,
message.content[0].text.value,
RESET,
)
messages.append(message.content[0].text.value)
elif step.type == "tool_calls":
for tool_call in step.step_details.tool_calls:
if (
tool_call["type"]
if isinstance(tool_call, dict)
else tool_call.type
) == "retrieval":
LOGGER.info(
"%s*Reviewer is checking the file*%s",
BLUE,
RESET,
)
if reviewer_run.status != "completed":
if reviewer_run.last_error:
if reviewer_run.last_error.code == "rate_limit_exceeded":
raise RateLimitExceeded(reviewer_run.last_error.message)
elif reviewer_run.last_error.code == "server_error":
raise ServerError(reviewer_run.last_error.message)
raise RuntimeError(str(reviewer_run.last_error))
raise RuntimeError("reviewer_run status = " + reviewer_run.status)
# Save Reviewer respose to Supervisor
tool_outputs.append(
openai.types.beta.threads.run_submit_tool_outputs_params.ToolOutput(
tool_call_id=supervisor_tool_call.id,
output=json_dumps(
{
"response": messages
if len(messages) != 1
else messages[0]
}
),
)
)
client.beta.threads.runs.submit_tool_outputs(
thread_id=supervisor_thread.id,
run_id=supervisor_run.id,
tool_outputs=tool_outputs,
)
elif supervisor_run.status == "completed":
break
else:
if supervisor_run.last_error:
if supervisor_run.last_error.code == "rate_limit_exceeded":
raise RateLimitExceeded(supervisor_run.last_error.message)
elif supervisor_run.last_error.code == "server_error":
raise ServerError(supervisor_run.last_error.message)
raise RuntimeError(str(supervisor_run.last_error))
raise RuntimeError("supervisor_run status = " + supervisor_run.status)
LOGGER.info("User to Supervisor:")
message = input("> ")
if not message:
break
client.beta.threads.messages.create(
thread_id=supervisor_thread.id,
role="user",
content=message,
)
supervisor_run = client.beta.threads.runs.create(
thread_id=supervisor_thread.id,
assistant_id=supervisor_agent_id,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment