Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save vaishnav-murtadkar-appliedaiconsulting/13dae1eb6ad2835db4c831a2ff17dd88 to your computer and use it in GitHub Desktop.
Save vaishnav-murtadkar-appliedaiconsulting/13dae1eb6ad2835db4c831a2ff17dd88 to your computer and use it in GitHub Desktop.
from datetime import datetime
from typing import Literal
from langchain_community.tools import DuckDuckGoSearchResults, OpenWeatherMapQueryRun
from langchain_community.utilities import OpenWeatherMapAPIWrapper
from langchain_core.language_models.chat_models import BaseChatModel
from langchain_core.messages import AIMessage, SystemMessage, HumanMessage
from langchain_core.runnables import RunnableConfig, RunnableLambda, RunnableSerializable
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, MessagesState, StateGraph
from langgraph.managed import RemainingSteps
from langgraph.prebuilt import ToolNode
from agents.llama_guard import LlamaGuard, LlamaGuardOutput, SafetyAssessment
from tools.get_vector_embedding_mongo import get_vector_embedding_mongo
from tools.validate_dates import validate_dates
from tools.validate_property_and_room import validate_property_and_room
# from tools.show_available_rooms import show_available_rooms
from tools.extract_dates import extract_dates
from tools.is_room_available import is_room_available
from tools.validate_user_inputs import validate_user_inputs
from tools.check_room_reservation_after_resolving import check_room_reservation_after_resolving
from tools.resolve_person_name import resolve_person_name
from tools.resolve_room_name import resolve_room_name
# from tools.identify_person_type import identify_person_type
from tools.show_access_codes import show_access_codes
from tools.checkout_reservation import checkout_reservation
from tools.resolve_reservation import resolve_reservation
from tools.format_response import format_response
import json
from core import get_model, settings
from agents.utils import extract_personal_info, replace_redacted_with_original, check_input_for_pii
class AgentState(MessagesState, total=False):
"""`total=False` is PEP589 specs.
documentation: https://typing.readthedocs.io/en/latest/spec/typeddict.html#totality
"""
safety: LlamaGuardOutput
remaining_steps: RemainingSteps
tools = [
get_vector_embedding_mongo,
extract_dates,
validate_dates,
validate_property_and_room,
is_room_available,
# show_available_rooms,
validate_user_inputs,
check_room_reservation_after_resolving,
resolve_person_name,
resolve_room_name,
show_access_codes,
# checkout_reservation,
# identify_person_type,
resolve_reservation,
format_response
]
instructions = f"""
You are an intelligent assistant with three modes:
• Booking rooms (Booking Coordinator Mode)
• Viewing existing reservations (Reservation Lookup Mode)
• Access Codes Mode
• Checking out reservations (Checkout Reservation Mode)
• Answering knowledge questions (Knowledge Assistant Mode)
On every reply, **call** format_response(type, message) as your final action—do not emit raw text.
1. Intent Detection
• If the user's request is about booking a new room (mentions check-in, check-out, dates, etc.) → Booking Coordinator Mode
• Else if the user mentions "check out" or "checkout" or "checkout reservation" → Checkout Reservation Mode
• Else if user says "Code for <X>" → Access Codes Mode
• Else if the user expresses intent to view current reservations (mentions a room name/number or guest name and lookup or just mentions guest or room name) → Reservation Lookup Mode
• Otherwise → Knowledge Assistant Mode
2. Booking Coordinator Mode
1. **Validate inputs** on every user message by calling:
```
validate_user_inputs(
user_input=raw_user_message,
identified_first_name=stored_first_name,
identified_last_name=stored_last_name,
identified_email=stored_email,
identified_checkindate=stored_checkin,
identified_checkoutdate=stored_checkout,
identified_roomnumber=stored_room
)
```
- **Always** pass the **entire latest** user message as `user_input`.
- If the tool returns `isValid: False` with `missing_fields`:
`format_response("bookroom_askdetails", "Please provide: missing_fields")`
- Update `stored_*` variables whenever the user supplies a new value.
2. Once `isValid: True`:
• Call `extract_dates(checkin_str, checkout_str)`
• Call `validate_dates(extracted_checkin, extracted_checkout)`
3. Validate room via `validate_property_and_room`
- If `result=False`:
`format_response("bookroom", "Invalid room. Please choose another.")`
- If `result=True`:
Call below tool
4. Check availability via `is_room_available`
- If returns False:
`format_response("bookroom", "Sorry, room [roomnumber] is not available for those dates. Would you like different dates or another room?")`
- If returns True:
`format_response("returnback", "<full reservation JSON payload>")`
In above JSON payload use keys specifically: firstname, lastname, email, startdate, enddate, roomnumber
3. Reservation Lookup Mode
• Detect whether X is a guest (name/email) or a room (#/name) or a reservation number
• IMPORTANT : call tool format_response("chatbotmessage", "Do you want to view reservations for [X]?") and return
• **On user confirmation (yes)**:
- If guest: call resolve_person_name(X)
* If potential_matches: format_response("chatbotmessage","Please clarify: [matches]") and return
* Else (exact_match): guest_id = person_details.id
- Else If (room): call resolve_room_name(X)
* If potential_matches: format_response("chatbotmessage","Please clarify: [matches]") and return
* Else (exact_match): room_id = room_details.id
- Else If (reservation number): directly call check_room_reservation_after_resolving(reservation_number=X)
* If True: format_response("deeplink_show_reservation", {{"reservation_number": X}})
* Else: format_response("chatbotmessage","Error: [error]")
- After resolving guest or room, call check_room_reservation_after_resolving(guest_id, room_id)
* If True: format_response("deeplink_show_reservation", {{"guest_id": guest_id}} or {{"room_id": room_id}})
* Else: format_response("chatbotmessage","Error: [error]")
• **On user denial (no)**:
format_response("chatbotmessage", "Okay, let me know what else I can do.")
4. Access Codes Mode
• Let X be the text after “Code for” or “Show Access Code for”
• Call resolve_person_name(X)
- If status=="potential_matches":
`format_response("chatbotmessage", "Please clarify: [matches]")` and return
- If status=="exact_match":
codes = show_access_codes(guest_name=X)
`format_response("show_access_codes", [format access_codes with guest name])` and return
• Call resolve_room_name(X)
- If status=="potential_matches":
`format_response("chatbotmessage", "Please clarify: [matches]")` and return
- If status=="exact_match":
codes = show_access_codes(room_name=X)
`format_response("show_access_codes", [format access_codes with room name])` and return
• `format_response("chatbotmessage", "I couldn't find that guest or room.")`
5. Checkout Reservation Mode
• If guest or room name provided:
1. Call `resolve_reservation(guest_name=X)` or `resolve_reservation(room_name=X)` → list of matches with IDs.
2. `format_response("chatbotmessage", "Please select a reservation to check out: [matches]")` and return.
3. On selection, extract `reservation_id`.
• If reservation ID provided directly: use that.
• Call:
` format_response("deeplink_checkout_reservation", {{"reservation_id": reservation_id}})`
5. Knowledge Assistant Mode
1. Call `get_vector_embedding_mongo(user_query)` exactly once.
2. If no data returned:
`format_response("chatbotmessage", "I don't have information about that. Please ask something related to our services or properties.")`
3. If data returned but not closelyMatching:
`format_response("chatbotmessage", "I found some information but need clarification: …")`
4. If closelyMatching:
- Summarize using `<strong>` for emphasis.
- If a source URL is available, append only:
`<a href="URL" target="_blank">Source</a>`
- Then:
`format_response("chatbotmessage", "<your summary>")`
📌 Always:
• Retain all user-provided info across turns
• Never call any tool more than specified
• Be proactive, clear, and minimize user effort
• Do not over-rigidly hardcode flows—interpret intent flexibly
• Directly return what the format_response tool returns without any formatting or changes
"""
def wrap_model(model: BaseChatModel) -> RunnableSerializable[AgentState, AIMessage]:
model = model.bind_tools(tools)
preprocessor = RunnableLambda(
lambda state: [SystemMessage(content=instructions)] + state["messages"],
name="StateModifier",
)
return preprocessor | model
def format_safety_message(safety: LlamaGuardOutput) -> AIMessage:
content = (
f"This conversation was flagged for unsafe content: {', '.join(safety.unsafe_categories)}"
)
return AIMessage(content=content)
session_mapping = {}
from aiocache import caches
pii_cache = caches.get("default")
async def acall_model(state: AgentState, config: RunnableConfig) -> AgentState:
m = get_model(config["configurable"].get("model", settings.DEFAULT_MODEL))
kb_helper_agent = wrap_model(m)
# Before calling the llm now we readact the PII
user_input = state["messages"][-1].content
# print(f"User Inputzzz: {user_input}")
if isinstance(state["messages"][-1], HumanMessage):
print(f"!Human msg: {state["messages"][-1].content}")
# Only when it's a Human message then only we process it for PII redaction...
# state["messages"][-1].content = "Hello Agent :)"
content = state["messages"][-1].content
thread_id = config["metadata"]["thread_id"]
# Check if the input contains PII
toCheckForPii = check_input_for_pii(content)
print(f"Does contain PII: {toCheckForPii}")
if toCheckForPii:
thread_prefix = thread_id[:8]
result = await extract_personal_info(content, "REDACT_FIRSTNAME", "REDACT_LASTNAME", "REDACT_EMAIL", "REDACT_MBNO", thread_prefix, session_mapping)
print(f"Resultx: {result}")
print(f"Current Thread_P: {thread_prefix}")
print(f"before session_mapping: {session_mapping}")
if not session_mapping.get(thread_prefix, {}).get("firstname"):
session_mapping.setdefault(thread_prefix, {})["firstname"] = result["redacted_values"]["firstname"]
if not session_mapping.get(thread_prefix, {}).get("lastname"):
session_mapping.setdefault(thread_prefix, {})["lastname"] = result["redacted_values"]["lastname"]
if result["is_email_valid"] == True:
if not session_mapping.get(thread_prefix, {}).get("email"):
session_mapping.setdefault(thread_prefix, {})["email"] = result["redacted_values"]["email"]
print(f"aftsession_mappinger : {session_mapping}")
await pii_cache.set(thread_prefix, json.dumps(session_mapping))
print("setting agent's message as result['redacted_text']")
state["messages"][-1].content = result['redacted_text']
if result["is_email_valid"] == False:
return {
"messages": [
AIMessage(
id=None,
content='{"type": "chatbotmessage", "message": "Please enter a valid email", "isPropertyRequired": false}'
)
]
}
# session_mapping[thread_prefix] = result["redacted_values"]
# print(f"Now session_mapping: {session_mapping}")
# state["messages"][-1].content = result['redacted_text']
print(f"Invoking the actual agent now with message: {state["messages"][-1].content}")
response = await kb_helper_agent.ainvoke(state, config)
# print(f"Came back response from agent: {response}")
# Assuming response is an AIMessage
if isinstance(response, AIMessage) and response.content:
if "REDACT" in response.content:
print("Has redacted values...")
print(f"Session: {session_mapping}")
print(f"response.content: {response.content}")
print(f"respons.content type: {type(response.content)}")
# newContent = replace_redacted_with_original(response.content, session_mapping)
thread_id2 = config["metadata"]["thread_id"]
print(f"Thread id 2: {thread_id2}")
thread_prefix2 = thread_id2[:8]
newContent = await replace_redacted_with_original(response.content, thread_prefix2)
print(f"New content: {newContent}")
response.content = newContent
else:
print("Does not have redacted values!")
# Here we re-insert the PI in the response...
# print(f"Response: {response}")
# print(f"Response.content: {response.content}")
# thread_id = config["metadata"]["thread_id"]
# if thread_id not in session_mapping:
# # Initial request of the user...
# # Now redact the PI details in the request
# thread_prefix = thread_id[:8]
# result = extract_personal_info(response.content, "REDACT_NAME_" + thread_prefix, "REDACT_EMAIL_" + thread_prefix, "REDACT_MBNO_" + thread_prefix)
# # save the details in the variable
# session_mapping[thread_prefix] = result["redacted_values"]
# Run llama guard check here to avoid returning the message if it's unsafe
llama_guard = LlamaGuard()
safety_output = await llama_guard.ainvoke("Agent", state["messages"] + [response])
if safety_output.safety_assessment == SafetyAssessment.UNSAFE:
return {"messages": [format_safety_message(safety_output)], "safety": safety_output}
if state["remaining_steps"] < 2 and response.tool_calls:
return {
"messages": [
AIMessage(
id=response.id,
content="Sorry, need more steps to process this request.",
)
]
}
# We return a list, because this will get added to the existing list
return {"messages": [response]}
async def llama_guard_input(state: AgentState, config: RunnableConfig) -> AgentState:
llama_guard = LlamaGuard()
safety_output = await llama_guard.ainvoke("User", state["messages"])
return {"safety": safety_output}
async def block_unsafe_content(state: AgentState, config: RunnableConfig) -> AgentState:
safety: LlamaGuardOutput = state["safety"]
return {"messages": [format_safety_message(safety)]}
async def redact_handler(state: AgentState, config: RunnableConfig) -> AgentState:
return {"messages": "z"}
# Define the graph
agent = StateGraph(AgentState)
agent.add_node("redact_pi", redact_handler)
agent.add_node("model", acall_model)
agent.add_node("tools", ToolNode(tools))
agent.add_node("guard_input", llama_guard_input)
agent.add_node("block_unsafe_content", block_unsafe_content)
agent.set_entry_point("guard_input")
# Check for unsafe input and block further processing if found
def check_safety(state: AgentState) -> Literal["unsafe", "safe"]:
safety: LlamaGuardOutput = state["safety"]
match safety.safety_assessment:
case SafetyAssessment.UNSAFE:
return "unsafe"
case _:
return "safe"
agent.add_conditional_edges(
"guard_input", check_safety, {"unsafe": "block_unsafe_content", "safe": "model"}
)
# Always END after blocking unsafe content
agent.add_edge("block_unsafe_content", END)
# Always run "model" after "tools"
agent.add_edge("tools", "model")
# After "model", if there are tool calls, run "tools". Otherwise END.
def pending_tool_calls(state: AgentState) -> Literal["tools", "done"]:
last_message = state["messages"][-1]
if not isinstance(last_message, AIMessage):
raise TypeError(f"Expected AIMessage, got {type(last_message)}")
if last_message.tool_calls:
return "tools"
return "done"
agent.add_conditional_edges("model", pending_tool_calls, {"tools": "tools", "done": END})
zoho_kb_agent = agent.compile(checkpointer=MemorySaver())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment