Created
May 19, 2025 06:10
-
-
Save vaishnav-murtadkar-appliedaiconsulting/13dae1eb6ad2835db4c831a2ff17dd88 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from typing import Literal | |
from langchain_community.tools import DuckDuckGoSearchResults, OpenWeatherMapQueryRun | |
from langchain_community.utilities import OpenWeatherMapAPIWrapper | |
from langchain_core.language_models.chat_models import BaseChatModel | |
from langchain_core.messages import AIMessage, SystemMessage, HumanMessage | |
from langchain_core.runnables import RunnableConfig, RunnableLambda, RunnableSerializable | |
from langgraph.checkpoint.memory import MemorySaver | |
from langgraph.graph import END, MessagesState, StateGraph | |
from langgraph.managed import RemainingSteps | |
from langgraph.prebuilt import ToolNode | |
from agents.llama_guard import LlamaGuard, LlamaGuardOutput, SafetyAssessment | |
from tools.get_vector_embedding_mongo import get_vector_embedding_mongo | |
from tools.validate_dates import validate_dates | |
from tools.validate_property_and_room import validate_property_and_room | |
# from tools.show_available_rooms import show_available_rooms | |
from tools.extract_dates import extract_dates | |
from tools.is_room_available import is_room_available | |
from tools.validate_user_inputs import validate_user_inputs | |
from tools.check_room_reservation_after_resolving import check_room_reservation_after_resolving | |
from tools.resolve_person_name import resolve_person_name | |
from tools.resolve_room_name import resolve_room_name | |
# from tools.identify_person_type import identify_person_type | |
from tools.show_access_codes import show_access_codes | |
from tools.checkout_reservation import checkout_reservation | |
from tools.resolve_reservation import resolve_reservation | |
from tools.format_response import format_response | |
import json | |
from core import get_model, settings | |
from agents.utils import extract_personal_info, replace_redacted_with_original, check_input_for_pii | |
class AgentState(MessagesState, total=False): | |
"""`total=False` is PEP589 specs. | |
documentation: https://typing.readthedocs.io/en/latest/spec/typeddict.html#totality | |
""" | |
safety: LlamaGuardOutput | |
remaining_steps: RemainingSteps | |
tools = [ | |
get_vector_embedding_mongo, | |
extract_dates, | |
validate_dates, | |
validate_property_and_room, | |
is_room_available, | |
# show_available_rooms, | |
validate_user_inputs, | |
check_room_reservation_after_resolving, | |
resolve_person_name, | |
resolve_room_name, | |
show_access_codes, | |
# checkout_reservation, | |
# identify_person_type, | |
resolve_reservation, | |
format_response | |
] | |
instructions = f""" | |
You are an intelligent assistant with three modes: | |
• Booking rooms (Booking Coordinator Mode) | |
• Viewing existing reservations (Reservation Lookup Mode) | |
• Access Codes Mode | |
• Checking out reservations (Checkout Reservation Mode) | |
• Answering knowledge questions (Knowledge Assistant Mode) | |
On every reply, **call** format_response(type, message) as your final action—do not emit raw text. | |
1. Intent Detection | |
• If the user's request is about booking a new room (mentions check-in, check-out, dates, etc.) → Booking Coordinator Mode | |
• Else if the user mentions "check out" or "checkout" or "checkout reservation" → Checkout Reservation Mode | |
• Else if user says "Code for <X>" → Access Codes Mode | |
• Else if the user expresses intent to view current reservations (mentions a room name/number or guest name and lookup or just mentions guest or room name) → Reservation Lookup Mode | |
• Otherwise → Knowledge Assistant Mode | |
2. Booking Coordinator Mode | |
1. **Validate inputs** on every user message by calling: | |
``` | |
validate_user_inputs( | |
user_input=raw_user_message, | |
identified_first_name=stored_first_name, | |
identified_last_name=stored_last_name, | |
identified_email=stored_email, | |
identified_checkindate=stored_checkin, | |
identified_checkoutdate=stored_checkout, | |
identified_roomnumber=stored_room | |
) | |
``` | |
- **Always** pass the **entire latest** user message as `user_input`. | |
- If the tool returns `isValid: False` with `missing_fields`: | |
`format_response("bookroom_askdetails", "Please provide: missing_fields")` | |
- Update `stored_*` variables whenever the user supplies a new value. | |
2. Once `isValid: True`: | |
• Call `extract_dates(checkin_str, checkout_str)` | |
• Call `validate_dates(extracted_checkin, extracted_checkout)` | |
3. Validate room via `validate_property_and_room` | |
- If `result=False`: | |
`format_response("bookroom", "Invalid room. Please choose another.")` | |
- If `result=True`: | |
Call below tool | |
4. Check availability via `is_room_available` | |
- If returns False: | |
`format_response("bookroom", "Sorry, room [roomnumber] is not available for those dates. Would you like different dates or another room?")` | |
- If returns True: | |
`format_response("returnback", "<full reservation JSON payload>")` | |
In above JSON payload use keys specifically: firstname, lastname, email, startdate, enddate, roomnumber | |
3. Reservation Lookup Mode | |
• Detect whether X is a guest (name/email) or a room (#/name) or a reservation number | |
• IMPORTANT : call tool format_response("chatbotmessage", "Do you want to view reservations for [X]?") and return | |
• **On user confirmation (yes)**: | |
- If guest: call resolve_person_name(X) | |
* If potential_matches: format_response("chatbotmessage","Please clarify: [matches]") and return | |
* Else (exact_match): guest_id = person_details.id | |
- Else If (room): call resolve_room_name(X) | |
* If potential_matches: format_response("chatbotmessage","Please clarify: [matches]") and return | |
* Else (exact_match): room_id = room_details.id | |
- Else If (reservation number): directly call check_room_reservation_after_resolving(reservation_number=X) | |
* If True: format_response("deeplink_show_reservation", {{"reservation_number": X}}) | |
* Else: format_response("chatbotmessage","Error: [error]") | |
- After resolving guest or room, call check_room_reservation_after_resolving(guest_id, room_id) | |
* If True: format_response("deeplink_show_reservation", {{"guest_id": guest_id}} or {{"room_id": room_id}}) | |
* Else: format_response("chatbotmessage","Error: [error]") | |
• **On user denial (no)**: | |
format_response("chatbotmessage", "Okay, let me know what else I can do.") | |
4. Access Codes Mode | |
• Let X be the text after “Code for” or “Show Access Code for” | |
• Call resolve_person_name(X) | |
- If status=="potential_matches": | |
`format_response("chatbotmessage", "Please clarify: [matches]")` and return | |
- If status=="exact_match": | |
codes = show_access_codes(guest_name=X) | |
`format_response("show_access_codes", [format access_codes with guest name])` and return | |
• Call resolve_room_name(X) | |
- If status=="potential_matches": | |
`format_response("chatbotmessage", "Please clarify: [matches]")` and return | |
- If status=="exact_match": | |
codes = show_access_codes(room_name=X) | |
`format_response("show_access_codes", [format access_codes with room name])` and return | |
• `format_response("chatbotmessage", "I couldn't find that guest or room.")` | |
5. Checkout Reservation Mode | |
• If guest or room name provided: | |
1. Call `resolve_reservation(guest_name=X)` or `resolve_reservation(room_name=X)` → list of matches with IDs. | |
2. `format_response("chatbotmessage", "Please select a reservation to check out: [matches]")` and return. | |
3. On selection, extract `reservation_id`. | |
• If reservation ID provided directly: use that. | |
• Call: | |
` format_response("deeplink_checkout_reservation", {{"reservation_id": reservation_id}})` | |
5. Knowledge Assistant Mode | |
1. Call `get_vector_embedding_mongo(user_query)` exactly once. | |
2. If no data returned: | |
`format_response("chatbotmessage", "I don't have information about that. Please ask something related to our services or properties.")` | |
3. If data returned but not closelyMatching: | |
`format_response("chatbotmessage", "I found some information but need clarification: …")` | |
4. If closelyMatching: | |
- Summarize using `<strong>` for emphasis. | |
- If a source URL is available, append only: | |
`<a href="URL" target="_blank">Source</a>` | |
- Then: | |
`format_response("chatbotmessage", "<your summary>")` | |
📌 Always: | |
• Retain all user-provided info across turns | |
• Never call any tool more than specified | |
• Be proactive, clear, and minimize user effort | |
• Do not over-rigidly hardcode flows—interpret intent flexibly | |
• Directly return what the format_response tool returns without any formatting or changes | |
""" | |
def wrap_model(model: BaseChatModel) -> RunnableSerializable[AgentState, AIMessage]: | |
model = model.bind_tools(tools) | |
preprocessor = RunnableLambda( | |
lambda state: [SystemMessage(content=instructions)] + state["messages"], | |
name="StateModifier", | |
) | |
return preprocessor | model | |
def format_safety_message(safety: LlamaGuardOutput) -> AIMessage: | |
content = ( | |
f"This conversation was flagged for unsafe content: {', '.join(safety.unsafe_categories)}" | |
) | |
return AIMessage(content=content) | |
session_mapping = {} | |
from aiocache import caches | |
pii_cache = caches.get("default") | |
async def acall_model(state: AgentState, config: RunnableConfig) -> AgentState: | |
m = get_model(config["configurable"].get("model", settings.DEFAULT_MODEL)) | |
kb_helper_agent = wrap_model(m) | |
# Before calling the llm now we readact the PII | |
user_input = state["messages"][-1].content | |
# print(f"User Inputzzz: {user_input}") | |
if isinstance(state["messages"][-1], HumanMessage): | |
print(f"!Human msg: {state["messages"][-1].content}") | |
# Only when it's a Human message then only we process it for PII redaction... | |
# state["messages"][-1].content = "Hello Agent :)" | |
content = state["messages"][-1].content | |
thread_id = config["metadata"]["thread_id"] | |
# Check if the input contains PII | |
toCheckForPii = check_input_for_pii(content) | |
print(f"Does contain PII: {toCheckForPii}") | |
if toCheckForPii: | |
thread_prefix = thread_id[:8] | |
result = await extract_personal_info(content, "REDACT_FIRSTNAME", "REDACT_LASTNAME", "REDACT_EMAIL", "REDACT_MBNO", thread_prefix, session_mapping) | |
print(f"Resultx: {result}") | |
print(f"Current Thread_P: {thread_prefix}") | |
print(f"before session_mapping: {session_mapping}") | |
if not session_mapping.get(thread_prefix, {}).get("firstname"): | |
session_mapping.setdefault(thread_prefix, {})["firstname"] = result["redacted_values"]["firstname"] | |
if not session_mapping.get(thread_prefix, {}).get("lastname"): | |
session_mapping.setdefault(thread_prefix, {})["lastname"] = result["redacted_values"]["lastname"] | |
if result["is_email_valid"] == True: | |
if not session_mapping.get(thread_prefix, {}).get("email"): | |
session_mapping.setdefault(thread_prefix, {})["email"] = result["redacted_values"]["email"] | |
print(f"aftsession_mappinger : {session_mapping}") | |
await pii_cache.set(thread_prefix, json.dumps(session_mapping)) | |
print("setting agent's message as result['redacted_text']") | |
state["messages"][-1].content = result['redacted_text'] | |
if result["is_email_valid"] == False: | |
return { | |
"messages": [ | |
AIMessage( | |
id=None, | |
content='{"type": "chatbotmessage", "message": "Please enter a valid email", "isPropertyRequired": false}' | |
) | |
] | |
} | |
# session_mapping[thread_prefix] = result["redacted_values"] | |
# print(f"Now session_mapping: {session_mapping}") | |
# state["messages"][-1].content = result['redacted_text'] | |
print(f"Invoking the actual agent now with message: {state["messages"][-1].content}") | |
response = await kb_helper_agent.ainvoke(state, config) | |
# print(f"Came back response from agent: {response}") | |
# Assuming response is an AIMessage | |
if isinstance(response, AIMessage) and response.content: | |
if "REDACT" in response.content: | |
print("Has redacted values...") | |
print(f"Session: {session_mapping}") | |
print(f"response.content: {response.content}") | |
print(f"respons.content type: {type(response.content)}") | |
# newContent = replace_redacted_with_original(response.content, session_mapping) | |
thread_id2 = config["metadata"]["thread_id"] | |
print(f"Thread id 2: {thread_id2}") | |
thread_prefix2 = thread_id2[:8] | |
newContent = await replace_redacted_with_original(response.content, thread_prefix2) | |
print(f"New content: {newContent}") | |
response.content = newContent | |
else: | |
print("Does not have redacted values!") | |
# Here we re-insert the PI in the response... | |
# print(f"Response: {response}") | |
# print(f"Response.content: {response.content}") | |
# thread_id = config["metadata"]["thread_id"] | |
# if thread_id not in session_mapping: | |
# # Initial request of the user... | |
# # Now redact the PI details in the request | |
# thread_prefix = thread_id[:8] | |
# result = extract_personal_info(response.content, "REDACT_NAME_" + thread_prefix, "REDACT_EMAIL_" + thread_prefix, "REDACT_MBNO_" + thread_prefix) | |
# # save the details in the variable | |
# session_mapping[thread_prefix] = result["redacted_values"] | |
# Run llama guard check here to avoid returning the message if it's unsafe | |
llama_guard = LlamaGuard() | |
safety_output = await llama_guard.ainvoke("Agent", state["messages"] + [response]) | |
if safety_output.safety_assessment == SafetyAssessment.UNSAFE: | |
return {"messages": [format_safety_message(safety_output)], "safety": safety_output} | |
if state["remaining_steps"] < 2 and response.tool_calls: | |
return { | |
"messages": [ | |
AIMessage( | |
id=response.id, | |
content="Sorry, need more steps to process this request.", | |
) | |
] | |
} | |
# We return a list, because this will get added to the existing list | |
return {"messages": [response]} | |
async def llama_guard_input(state: AgentState, config: RunnableConfig) -> AgentState: | |
llama_guard = LlamaGuard() | |
safety_output = await llama_guard.ainvoke("User", state["messages"]) | |
return {"safety": safety_output} | |
async def block_unsafe_content(state: AgentState, config: RunnableConfig) -> AgentState: | |
safety: LlamaGuardOutput = state["safety"] | |
return {"messages": [format_safety_message(safety)]} | |
async def redact_handler(state: AgentState, config: RunnableConfig) -> AgentState: | |
return {"messages": "z"} | |
# Define the graph | |
agent = StateGraph(AgentState) | |
agent.add_node("redact_pi", redact_handler) | |
agent.add_node("model", acall_model) | |
agent.add_node("tools", ToolNode(tools)) | |
agent.add_node("guard_input", llama_guard_input) | |
agent.add_node("block_unsafe_content", block_unsafe_content) | |
agent.set_entry_point("guard_input") | |
# Check for unsafe input and block further processing if found | |
def check_safety(state: AgentState) -> Literal["unsafe", "safe"]: | |
safety: LlamaGuardOutput = state["safety"] | |
match safety.safety_assessment: | |
case SafetyAssessment.UNSAFE: | |
return "unsafe" | |
case _: | |
return "safe" | |
agent.add_conditional_edges( | |
"guard_input", check_safety, {"unsafe": "block_unsafe_content", "safe": "model"} | |
) | |
# Always END after blocking unsafe content | |
agent.add_edge("block_unsafe_content", END) | |
# Always run "model" after "tools" | |
agent.add_edge("tools", "model") | |
# After "model", if there are tool calls, run "tools". Otherwise END. | |
def pending_tool_calls(state: AgentState) -> Literal["tools", "done"]: | |
last_message = state["messages"][-1] | |
if not isinstance(last_message, AIMessage): | |
raise TypeError(f"Expected AIMessage, got {type(last_message)}") | |
if last_message.tool_calls: | |
return "tools" | |
return "done" | |
agent.add_conditional_edges("model", pending_tool_calls, {"tools": "tools", "done": END}) | |
zoho_kb_agent = agent.compile(checkpointer=MemorySaver()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment