Created
June 27, 2025 12:48
-
-
Save HrishikeshChaudhari24/a712e7c11b4e178a2bf17221bfebb06c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import logging | |
import threading | |
import time | |
import uuid | |
from datetime import datetime, timedelta | |
from typing import Any, Dict, List, Optional,Set | |
import uvicorn | |
from fastapi import FastAPI | |
from fastapi.responses import RedirectResponse | |
from langgraph.graph import END, StateGraph | |
from pydantic import BaseModel | |
import sqlite3 | |
from agent.models.database import ( | |
cleanup_expired_credentials, | |
delete_user_credentials, | |
load_conversation_state, | |
load_user_credentials, | |
reset_database, | |
save_conversation_state, | |
store_user_credentials, | |
check_database_health, | |
user_credentials, | |
) | |
from agent.models.calendar import ( | |
book_appointment, | |
check_availability, | |
complete_oauth_flow, | |
generate_auth_url, | |
get_calendar_service, | |
get_user_appointments, | |
get_user_timezone, | |
parse_time_range, | |
init_global_appointments_db, | |
check_global_slot_availability, | |
get_available_slots_for_date, | |
book_global_appointment, | |
get_all_global_appointments, | |
get_conflicting_appointments | |
) | |
init_global_appointments_db() | |
import google.generativeai as genai | |
from dateutil import parser | |
from groq import Groq | |
GROQ_API_KEY = "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" | |
GEMINI_MODELS = [ | |
"gemini-2.0-flash", | |
"gemini-1.5-flash", | |
] | |
GEMINI_API_KEY = "AIzaSyCcd9zzBsgQ6j0gNQ_4TvA59xpHNsLQ7uQ" | |
genai.configure(api_key=GEMINI_API_KEY) | |
logger = logging.getLogger(__name__) | |
app = FastAPI() | |
def call_llama_groq(user_input: str) -> dict: | |
client = Groq(api_key=GROQ_API_KEY) | |
try: | |
chat_completion = client.chat.completions.create( | |
messages=[{"role": "user", "content": user_input}], | |
model="llama-3.3-70b-versatile", | |
stream=False, | |
) | |
content = chat_completion.choices[0].message.content | |
return {"intent": "unknown", "response": content} | |
except Exception as e: | |
return {"intent": "unknown", "error": f"Llama fallback failed: {e}"} | |
def fallback_parse_input(user_input: str, today: str) -> dict: | |
"""Simple fallback parser when Gemini fails""" | |
user_input_lower = user_input.lower() | |
# Basic intent detection | |
if any(word in user_input_lower for word in ["show", "my appointments", "meetings", "schedule"]): | |
intent = "show_appointments" | |
elif any(word in user_input_lower for word in ["book", "schedule", "appointment"]): | |
intent = "book_appointment" | |
elif any(word in user_input_lower for word in ["check", "availability", "free"]): | |
intent = "check_availability" | |
else: | |
intent = "unknown" | |
# Basic date parsing | |
date = None | |
if "today" in user_input_lower: | |
date = today | |
elif "tomorrow" in user_input_lower: | |
tomorrow = datetime.now() + timedelta(days=1) | |
date = tomorrow.strftime("%Y-%m-%d") | |
# Basic time parsing | |
specific_time = None | |
import re | |
# Look for time patterns like "11 pm", "11:00 pm", "23:00" | |
time_patterns = [ | |
r'(\d{1,2}):?(\d{0,2})\s*(pm|am)', | |
r'(\d{1,2}):(\d{2})' | |
] | |
for pattern in time_patterns: | |
match = re.search(pattern, user_input_lower) | |
if match: | |
hour = int(match.group(1)) | |
minute = int(match.group(2)) if match.group(2) else 0 | |
if len(match.groups()) >= 3 and match.group(3): # Has AM/PM | |
ampm = match.group(3).upper() | |
specific_time = f"{hour:02d}:{minute:02d} {ampm}" | |
else: # 24-hour format | |
if hour > 12: | |
ampm = "PM" | |
hour = hour - 12 if hour > 12 else hour | |
else: | |
ampm = "AM" if hour < 12 else "PM" | |
specific_time = f"{hour:02d}:{minute:02d} {ampm}" | |
break | |
return { | |
"intent": intent, | |
"date": date, | |
"time_range": None, | |
"specific_time": specific_time, | |
"email": None | |
} | |
def call_gemini(user_input: str, context: Dict[str, Any] | None = None, retries: int = 3): | |
logger.info("Calling Gemini API with input: %s", user_input) | |
# Get current date and time in user's timezone | |
from datetime import datetime | |
import pytz | |
# Assume Indian timezone for now, but this should come from user settings | |
user_tz = pytz.timezone('Asia/Kolkata') | |
now = datetime.now(user_tz) | |
today = now.strftime("%Y-%m-%d") | |
current_time = now.strftime("%I:%M %p") | |
context_str = ( | |
f"Previous conversation context: Intent={context['last_intent']}, Date={context.get('date')}, Time={context.get('time_range')}" | |
if context and context.get("last_intent") | |
else "" | |
) | |
appointment_intents = [ | |
"show my appointments", | |
"my meetings", | |
"my calendar", | |
"my schedule", | |
"upcoming appointments", | |
"what meetings do i have", | |
"show appointments", | |
"view my schedule" | |
] | |
prompt = f""" | |
You are an appointment booking assistant. Analyze the user input and return a JSON object. | |
CURRENT CONTEXT: | |
- Current date: {today} | |
- Current time: {current_time} | |
- User timezone: Asia/Kolkata | |
{context_str} | |
IMPORTANT DATE/TIME PARSING RULES: | |
1. "today" = {today} | |
2. "tomorrow" = {(now + timedelta(days=1)).strftime("%Y-%m-%d")} | |
3. When user says "11 pm" or "11:00 pm", convert to "11:00 PM" format | |
4. When user says "23:00", convert to "11:00 PM" format | |
5. DO NOT convert times to UTC - keep them in user's local timezone | |
6. For booking appointments, if only a specific time is given (like "11 pm"), create a 30-minute slot | |
User input: "{user_input}" | |
Appointment-related intents: {appointment_intents} | |
If the user's message matches any of these, set "intent": "show_appointments". | |
Return JSON with: | |
- "intent": One of ["show_appointments", "book_appointment", "check_availability", "greeting", "unknown"] | |
- "date": Date in YYYY-MM-DD format (e.g., "{today}" for today) or null | |
- "time_range": {{"start": "HH:MM AM/PM", "end": "HH:MM AM/PM"}} or null | |
- "specific_time": Single time slot in 12-hour format (e.g., "11:00 PM") or null | |
- "email": Email address if mentioned or null | |
EXAMPLES: | |
Input: "book appointment today 11 pm" | |
Output: {{ | |
"intent": "book_appointment", | |
"date": "{today}", | |
"time_range": null, | |
"specific_time": "11:00 PM", | |
"email": null | |
}} | |
Input: "book meeting tomorrow 2-4 PM" | |
Output: {{ | |
"intent": "book_appointment", | |
"date": "{(now + timedelta(days=1)).strftime("%Y-%m-%d")}", | |
"time_range": {{"start": "2:00 PM", "end": "4:00 PM"}}, | |
"specific_time": null, | |
"email": null | |
}} | |
Input: "show my appointments" | |
Output: {{ | |
"intent": "show_appointments", | |
"date": null, | |
"time_range": null, | |
"specific_time": null, | |
"email": null | |
}} | |
""" | |
for model_name in GEMINI_MODELS: | |
try: | |
model = genai.GenerativeModel(model_name) | |
for attempt in range(retries): | |
try: | |
response = model.generate_content(prompt) | |
if not response or not response.text: | |
continue | |
text = response.text.strip() | |
if text.startswith("```json"): | |
text = text.replace("```json", "").replace("```", "").strip() | |
result = json.loads(text) | |
logger.info(f"Gemini parsed result: {result}") | |
return result | |
except Exception as e: | |
if "429" in str(e) and attempt < retries - 1: | |
time.sleep(2 ** attempt) | |
continue | |
logger.warning("Gemini error (%s): %s", model_name, e) | |
break | |
except Exception as e: | |
logger.warning("Model %s unavailable: %s", model_name, e) | |
# Fallback to simple parsing if Gemini fails | |
logger.warning("Gemini failed, using fallback parsing") | |
return fallback_parse_input(user_input, today) | |
class BookingRequest(BaseModel): | |
user_input: str | |
conversation_id: str | |
user_email: Optional[str] = None | |
from typing_extensions import TypedDict | |
class AgentState(TypedDict, total=False): | |
user_input: str | |
conversation_id: str | |
user_email: str | |
intent: str | |
date: Optional[str] | |
time_range: Optional[Dict[str, str]] | |
specific_time: Optional[str] | |
available_slots: List[str] | |
confirmed_slot: str | |
messages: List[Dict[str, str]] | |
last_intent: str | |
needs_more_info: bool | |
auth_required: bool | |
auth_url: str | |
def email_collection_node(state: Dict[str, Any]): | |
"""Check if user email is provided and handle authentication - ALWAYS runs first""" | |
state['messages'] = state.get('messages', []) | |
# ALWAYS check for authentication, regardless of intent | |
if not state.get('user_email'): | |
# Try to extract email from user input | |
import re | |
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
email_matches = re.findall(email_pattern, state['user_input']) | |
if email_matches: | |
state['user_email'] = email_matches[0] | |
logger.info(f"Email extracted: {state['user_email']}") | |
else: | |
# Try Gemini to extract email | |
gemini_result = call_gemini(state['user_input']) | |
if gemini_result.get('email'): | |
state['user_email'] = gemini_result['email'] | |
logger.info(f"Email extracted via Gemini: {state['user_email']}") | |
else: | |
# Ask for email - even for simple greetings | |
greeting_response = "Hi! I'm your AI appointment booking assistant. " | |
if "hi" in state['user_input'].lower() or "hello" in state['user_input'].lower(): | |
greeting_response += "Nice to meet you! " | |
greeting_response += "To access your calendar and help with appointments, please provide your email address." | |
state['messages'].append({ | |
'role': 'assistant', | |
'content': greeting_response | |
}) | |
state['needs_more_info'] = True | |
return state | |
# Check if user has valid cached credentials | |
if state.get('user_email'): | |
creds_json = load_user_credentials(state['user_email']) | |
if creds_json: | |
# User has valid cached credentials | |
try: | |
service = get_calendar_service(state['user_email']) | |
state['needs_more_info'] = False | |
state['auth_required'] = False | |
logger.info(f"Using cached credentials for {state['user_email']}") | |
return state | |
except Exception as e: | |
logger.error(f"Cached credentials failed for {state['user_email']}: {e}") | |
delete_user_credentials(state['user_email']) | |
state['auth_required'] = True | |
state['needs_more_info'] = False | |
# Generate auth URL immediately | |
try: | |
state['auth_url'] = generate_auth_url(state['user_email']) | |
except Exception as auth_e: | |
logger.error(f"Failed to generate auth URL: {auth_e}") | |
else: | |
state['auth_required'] = True | |
state['needs_more_info'] = False | |
# Generate auth URL immediately | |
try: | |
state['auth_url'] = generate_auth_url(state['user_email']) | |
except Exception as auth_e: | |
logger.error(f"Failed to generate auth URL: {auth_e}") | |
logger.info(f"No cached credentials for {state['user_email']}, requiring auth") | |
return state | |
def intent_detection_node(state: Dict[str, Any]): | |
if state.get("needs_more_info") or state.get("auth_required"): | |
return state | |
ctx = { | |
"last_intent": state.get("last_intent", ""), | |
"date": state.get("date"), | |
"time_range": state.get("time_range"), | |
} | |
gemini = call_gemini(state["user_input"], ctx) | |
state["intent"] = gemini.get("intent", "unknown") | |
state["date"] = gemini.get("date") or state.get("date") | |
state["time_range"] = gemini.get("time_range") or state.get("time_range") | |
state["specific_time"] = gemini.get("specific_time") | |
state["last_intent"] = state["intent"] | |
return state | |
def validation_node(state: Dict[str, Any]): | |
"""Enhanced validation that handles greetings and authentication properly""" | |
# If we're waiting for OAuth or need more info (like email), skip validation | |
if state.get("auth_required") or state.get("needs_more_info"): | |
return state | |
state["messages"] = state.get("messages", []) | |
user_input_lower = state.get("user_input", "").lower() | |
# Handle greetings when user is authenticated | |
if any(greeting in user_input_lower for greeting in ["hi", "hello", "hey", "good morning", "good afternoon"]): | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Hello! I'm your appointment assistant. I can help you:\n\nβ’ Book new appointments\nβ’ Check your availability\nβ’ Show your upcoming meetings\n\nWhat would you like to do today?" | |
}) | |
state["needs_more_info"] = True # Wait for their next input | |
return state | |
if state.get("intent") == "show_appointments": | |
state["needs_more_info"] = False | |
elif state.get("intent") in ["book_appointment", "check_availability"]: | |
if not state.get("date"): | |
state["messages"].append({ | |
"role": "assistant", | |
"content": "I'd be happy to help! What date would you like? You can say something like 'tomorrow', 'Friday', or a specific date.", | |
}) | |
state["needs_more_info"] = True | |
elif not state.get("time_range") and not state.get("specific_time"): | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Great! For {state['date']}, what time works for you? You can specify a range like '2-4 PM' or a specific time like '3 PM'.", | |
}) | |
state["needs_more_info"] = True | |
else: | |
# Convert specific_time to time_range if needed (existing logic) | |
if state.get('specific_time') and not state.get('time_range'): | |
try: | |
time_str = state['specific_time'] | |
# Handle 24-hour format | |
if ':' in time_str and 'AM' not in time_str.upper() and 'PM' not in time_str.upper(): | |
# Convert 24-hour to 12-hour format | |
try: | |
dt = datetime.strptime(time_str, '%H:%M') | |
time_str = dt.strftime('%I:%M %p') | |
except ValueError: | |
raise ValueError("Invalid 24-hour time format") | |
elif ':' not in time_str and ('AM' in time_str.upper() or 'PM' in time_str.upper()): | |
# Handle formats like "8 PM" | |
time_str = time_str.replace(' ', ':00 ') | |
specific_dt = datetime.strptime(time_str, '%I:%M %p') | |
end_dt = specific_dt + timedelta(minutes=30) | |
state['time_range'] = { | |
'start': specific_dt.strftime('%I:%M %p'), | |
'end': end_dt.strftime('%I:%M %p') | |
} | |
logger.info("Converted specific time %s to range %s", time_str, state['time_range']) | |
except Exception as e: | |
logger.error("Time conversion error: %s", e) | |
state["messages"].append({ | |
"role": "assistant", | |
"content": "Sorry, I couldn't understand the time format. Please specify like '8:00 PM', '8 PM', or '20:00'.", | |
}) | |
state["needs_more_info"] = True | |
return state | |
state["needs_more_info"] = False | |
else: | |
# Unknown intent - provide helpful guidance | |
state["messages"].append({ | |
"role": "assistant", | |
"content": "I can help you with appointments! Try saying something like:\n\nβ’ 'Book a meeting tomorrow at 3 PM'\nβ’ 'Check my availability Friday afternoon'\nβ’ 'Show my upcoming appointments'\n\nWhat would you like to do?", | |
}) | |
state["needs_more_info"] = True | |
return state | |
def availability_node(state: Dict[str, Any]): | |
"""Enhanced availability node with global slot checking and negotiation""" | |
if state.get("needs_more_info") or state.get("auth_required"): | |
return state | |
state.setdefault("messages", []) | |
if state.get("intent") in ["book_appointment", "check_availability"] and state.get("date") and state.get("time_range"): | |
date = state["date"] | |
time_range = state["time_range"] | |
start_time = time_range["start"] | |
end_time = time_range["end"] | |
# Check if the specific requested slot is available | |
if check_global_slot_availability(date, start_time, end_time): | |
state["available_slots"] = [f"{start_time} - {end_time}"] | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Great! The slot {start_time} - {end_time} on {date} is available. Would you like me to book it for you?", | |
}) | |
else: | |
# Slot is not available - show conflicts and alternatives | |
conflicts = get_conflicting_appointments(date, start_time, end_time) | |
conflict_msg = f"Sorry, the slot {start_time} - {end_time} on {date} is already booked" | |
if conflicts: | |
conflict_msg += f" (booked by other user)" | |
# Get alternative slots for the same day | |
available_alternatives = get_available_slots_for_date(date) | |
if available_alternatives: | |
# Limit to first 5 alternatives for better UX | |
alt_slots = available_alternatives[:5] | |
state["available_slots"] = alt_slots | |
alternatives_text = "\n".join([f"β’ {slot}" for slot in alt_slots]) | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"{conflict_msg}.\n\nHere are some available alternatives for {date}:\n\n{alternatives_text}\n\nWhich slot would you prefer? Just say the time (e.g., '2:00 PM - 2:30 PM')." | |
}) | |
state["needs_more_info"] = True # Wait for user to choose alternative | |
else: | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"{conflict_msg}.\n\nUnfortunately, there are no other available slots on {date}. Would you like to try a different date?" | |
}) | |
state["needs_more_info"] = True | |
state["available_slots"] = [] | |
elif state.get("intent") == "check_availability" and state.get("date"): | |
# Just checking availability, not booking | |
date = state["date"] | |
available_slots = get_available_slots_for_date(date) | |
if available_slots: | |
slots_text = "\n".join([f"β’ {slot}" for slot in available_slots[:10]]) # Show max 10 slots | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Here are the available slots for {date}:\n\n{slots_text}\n\nWould you like to book any of these slots?" | |
}) | |
state["available_slots"] = available_slots | |
else: | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Sorry, there are no available slots on {date}. Would you like to check a different date?" | |
}) | |
state["available_slots"] = [] | |
return state | |
def booking_node(state: Dict[str, Any]): | |
"""Enhanced booking node using global appointment system""" | |
if state.get("needs_more_info") or state.get("auth_required"): | |
return state | |
state.setdefault("messages", []) | |
if state.get("intent") == "book_appointment": | |
user_input_lower = state.get("user_input", "").lower() | |
# Check if user is selecting from available slots | |
if state.get("available_slots"): | |
selected_slot = None | |
# Try to match user input with available slots | |
for slot in state["available_slots"]: | |
# Extract time from slot format "HH:MM AM/PM - HH:MM AM/PM" | |
slot_times = slot.split(" - ") | |
if len(slot_times) == 2: | |
start_time = slot_times[0].strip() | |
end_time = slot_times[1].strip() | |
# Check if user mentioned this time | |
if (start_time.lower() in user_input_lower or | |
slot.lower() in user_input_lower or | |
start_time.replace(":00", "").lower() in user_input_lower): | |
selected_slot = slot | |
break | |
# If no specific slot mentioned, use the first available | |
if not selected_slot and state["available_slots"]: | |
selected_slot = state["available_slots"][0] | |
if selected_slot: | |
# Parse the selected slot | |
slot_parts = selected_slot.split(" - ") | |
if len(slot_parts) == 2: | |
start_time = slot_parts[0].strip() | |
end_time = slot_parts[1].strip() | |
date = state["date"] | |
user_email = state["user_email"] | |
# Book the appointment globally | |
result = book_global_appointment(user_email, date, start_time, end_time) | |
if result["status"] == "confirmed": | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"β Perfect! Your appointment is booked for {date} from {start_time} to {end_time}. Your appointment ID is #{result['appointment_id']}. Is there anything else I can help you with?" | |
}) | |
# Clear the slots since booking is complete | |
state["available_slots"] = [] | |
state["needs_more_info"] = False | |
elif result["status"] == "unavailable": | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Sorry, that slot was just booked by someone else! Let me check for other available slots..." | |
}) | |
# Refresh available slots | |
new_slots = get_available_slots_for_date(date) | |
state["available_slots"] = new_slots[:5] | |
if new_slots: | |
alternatives_text = "\n".join([f"β’ {slot}" for slot in new_slots[:5]]) | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Here are the updated available slots:\n\n{alternatives_text}\n\nWhich one would you like?" | |
}) | |
state["needs_more_info"] = True | |
else: | |
state["messages"].append({ | |
"role": "assistant", | |
"content": f"Sorry, I encountered an error while booking: {result.get('error', 'Unknown error')}. Please try again." | |
}) | |
state["needs_more_info"] = True | |
else: | |
state["messages"].append({ | |
"role": "assistant", | |
"content": "I couldn't parse the selected time slot. Please specify the time again." | |
}) | |
state["needs_more_info"] = True | |
else: | |
state["messages"].append({ | |
"role": "assistant", | |
"content": "I couldn't understand which slot you'd like. Please specify the exact time (e.g., '2:00 PM - 2:30 PM') from the available options." | |
}) | |
state["needs_more_info"] = True | |
else: | |
# No available slots, ask for different time/date | |
state["messages"].append({ | |
"role": "assistant", | |
"content": "I don't have any available slots to book. Would you like to try a different date or time?" | |
}) | |
state["needs_more_info"] = True | |
return state | |
def show_appointments_node(state: Dict[str, Any]): | |
"""Enhanced show appointments using global database""" | |
if state.get("needs_more_info") or state.get("auth_required"): | |
return state | |
state.setdefault("messages", []) | |
if state.get("intent") == "show_appointments" and state.get("user_email"): | |
appointments = get_all_global_appointments(state["user_email"]) | |
if appointments: | |
# Format appointments for display | |
appt_list = [] | |
for appt in appointments: | |
appt_list.append(f"β’ **{appt['title']}** on {appt['date']} from {appt['start_time']} to {appt['end_time']} (ID: #{appt['id']})") | |
reply = "Here are your upcoming appointments:\n\n" + "\n".join(appt_list) | |
reply += "\n\nWould you like to book another appointment or make any changes?" | |
else: | |
reply = "You have no upcoming appointments. Would you like to book one?" | |
state["messages"].append({"role": "assistant", "content": reply}) | |
return state | |
browser_sessions: Dict[str, Dict[str, any]] = {} | |
def init_browser_sessions_db(): | |
"""Initialize browser sessions table in database""" | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
c.execute(''' | |
CREATE TABLE IF NOT EXISTS browser_sessions ( | |
session_id TEXT PRIMARY KEY, | |
user_email TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
conn.commit() | |
conn.close() | |
def store_browser_session(session_id: str, user_email: str): | |
"""Store browser session in database""" | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
c.execute(''' | |
INSERT OR REPLACE INTO browser_sessions (session_id, user_email, last_accessed) | |
VALUES (?, ?, CURRENT_TIMESTAMP) | |
''', (session_id, user_email)) | |
conn.commit() | |
conn.close() | |
def get_session_user(session_id: str) -> str: | |
"""Get user email for a browser session""" | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
c.execute('SELECT user_email FROM browser_sessions WHERE session_id = ?', (session_id,)) | |
result = c.fetchone() | |
conn.close() | |
return result[0] if result else None | |
def get_user_sessions(user_email: str) -> List[str]: | |
"""Get all browser sessions for a user""" | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
c.execute('SELECT session_id FROM browser_sessions WHERE user_email = ?', (user_email,)) | |
results = c.fetchall() | |
conn.close() | |
return [r[0] for r in results] | |
def delete_browser_session(session_id: str, user_email: str = None): | |
"""Delete a specific browser session""" | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
if user_email: | |
c.execute('DELETE FROM browser_sessions WHERE session_id = ? AND user_email = ?', | |
(session_id, user_email)) | |
else: | |
c.execute('DELETE FROM browser_sessions WHERE session_id = ?', (session_id,)) | |
conn.commit() | |
conn.close() | |
def cleanup_old_sessions(): | |
"""Clean up sessions older than 30 days""" | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
c.execute(''' | |
DELETE FROM browser_sessions | |
WHERE last_accessed < datetime('now', '-30 days') | |
''') | |
conn.commit() | |
conn.close() | |
def create_workflow(): | |
workflow = StateGraph(AgentState) | |
workflow.add_node("email_collection", email_collection_node) | |
workflow.add_node("intent_detection", intent_detection_node) | |
workflow.add_node("validation", validation_node) | |
workflow.add_node("availability", availability_node) | |
workflow.add_node("booking", booking_node) | |
workflow.add_node("show_appointments", show_appointments_node) | |
# ALWAYS start with email collection/authentication | |
workflow.set_entry_point("email_collection") | |
# From email collection, route based on authentication status | |
workflow.add_conditional_edges( | |
"email_collection", | |
lambda state: ( | |
"end" if state.get('needs_more_info', False) or state.get('auth_required', False) | |
else "intent_detection" # Only proceed if authenticated and have email | |
), | |
{ | |
"intent_detection": "intent_detection", | |
"end": END | |
} | |
) | |
workflow.add_edge("intent_detection", "validation") | |
# Updated conditional routing from validation | |
workflow.add_conditional_edges( | |
"validation", | |
lambda state: ( | |
"end" if state.get('auth_required', False) or state.get('needs_more_info', True) | |
else "show_appointments" if state.get('intent') == 'show_appointments' | |
else "availability" if state.get('intent') in ['book_appointment', 'check_availability'] | |
else "end" | |
), | |
{ | |
"availability": "availability", | |
"show_appointments": "show_appointments", | |
"end": END | |
} | |
) | |
workflow.add_edge("availability", "booking") | |
workflow.add_edge("booking", END) | |
workflow.add_edge("show_appointments", END) | |
return workflow.compile() | |
graph = create_workflow() | |
@app.post("/store_session_user") | |
async def store_session_user(request: dict): | |
"""Store user-session mapping""" | |
try: | |
session_id = request.get("browser_session_id") | |
user_email = request.get("user_email") | |
if session_id and user_email: | |
store_browser_session(session_id, user_email) | |
return {"status": "success"} | |
else: | |
return {"status": "error", "message": "Missing session_id or user_email"} | |
except Exception as e: | |
logger.exception("Error storing session user") | |
return {"status": "error", "message": str(e)} | |
@app.get("/get_session_user") | |
async def get_session_user_endpoint(browser_session_id: str): | |
"""Get user for a browser session""" | |
try: | |
user_email = get_session_user(browser_session_id) | |
if user_email: | |
# Check if user is still authenticated | |
creds_json = load_user_credentials(user_email) | |
authorized = False | |
if creds_json: | |
try: | |
service = get_calendar_service(user_email) | |
authorized = True | |
except Exception: | |
delete_user_credentials(user_email) | |
return { | |
"user_email": user_email, | |
"authorized": authorized | |
} | |
else: | |
return {"user_email": None, "authorized": False} | |
except Exception as e: | |
logger.exception("Error getting session user") | |
return {"user_email": None, "authorized": False, "error": str(e)} | |
@app.post("/initiate_auth") | |
async def initiate_auth(request: dict): | |
"""Initiate authentication flow - SECURE VERSION""" | |
try: | |
user_email = request.get("user_email") | |
browser_session_id = request.get("browser_session_id") | |
if not user_email: | |
return {"status": "error", "message": "user_email is required"} | |
# SECURITY: Always generate fresh auth URL for new browser sessions | |
# Don't allow reusing credentials across browser sessions | |
auth_url = generate_auth_url(user_email) | |
# Store pending session | |
if browser_session_id: | |
browser_sessions[browser_session_id] = { | |
"user_email": user_email, | |
"status": "pending_auth", | |
"auth_url": auth_url | |
} | |
return { | |
"status": "pending_auth", | |
"auth_url": auth_url, | |
"message": f"Fresh auth required for {user_email}" | |
} | |
except Exception as e: | |
logger.exception(f"Error initiating auth") | |
return { | |
"status": "error", | |
"auth_url": "", | |
"message": f"Failed to initiate auth: {str(e)}" | |
} | |
@app.get("/authenticated_users") | |
async def get_authenticated_users(browser_session_id: str = None): | |
"""Get list of users who have valid authentication for this SPECIFIC browser session""" | |
try: | |
browser_authenticated_users = [] | |
if browser_session_id: | |
# SECURITY: Only get users for THIS specific browser session | |
user_email = get_session_user(browser_session_id) | |
if user_email: | |
# Verify credentials are still valid | |
creds_json = load_user_credentials(user_email) | |
if creds_json: | |
try: | |
service = get_calendar_service(user_email) | |
browser_authenticated_users.append(user_email) | |
except Exception: | |
delete_user_credentials(user_email) | |
delete_browser_session(browser_session_id, user_email) | |
return { | |
"browser_authenticated_users": browser_authenticated_users, | |
"count": len(browser_authenticated_users) | |
} | |
except Exception as e: | |
logger.exception("Error getting authenticated users") | |
return {"error": str(e), "browser_authenticated_users": []} | |
@app.get("/user_status/{user_email}") | |
async def get_user_status(user_email: str): | |
"""Get detailed status for a specific user""" | |
try: | |
# Check if user has valid credentials | |
creds_json = load_user_credentials(user_email) | |
if not creds_json: | |
return { | |
"user_email": user_email, | |
"authenticated": False, | |
"status": "no_credentials" | |
} | |
# Try to get calendar service | |
try: | |
service = get_calendar_service(user_email) | |
return { | |
"user_email": user_email, | |
"authenticated": True, | |
"status": "valid_credentials", | |
"can_access_calendar": True | |
} | |
except Exception as e: | |
# Credentials exist but are invalid | |
delete_user_credentials(user_email) | |
return { | |
"user_email": user_email, | |
"authenticated": False, | |
"status": "invalid_credentials", | |
"error": str(e) | |
} | |
except Exception as e: | |
logger.exception(f"Error checking status for {user_email}") | |
return { | |
"user_email": user_email, | |
"authenticated": False, | |
"status": "error", | |
"error": str(e) | |
} | |
@app.get("/database_status") | |
async def database_status(): | |
try: | |
healthy = check_database_health() | |
conn = sqlite3.connect("user_data.db") | |
c = conn.cursor() | |
c.execute("SELECT name FROM sqlite_master WHERE type='table'") | |
tables = [t[0] for t in c.fetchall()] | |
counts = {tbl: c.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0] for tbl in tables} | |
conn.close() | |
return { | |
"healthy": healthy, | |
"tables": tables, | |
"record_counts": counts, | |
"in_memory_users": list(user_credentials.keys()), | |
} | |
except Exception as e: | |
return {"error": str(e), "healthy": False} | |
@app.post("/fix_database") | |
async def fix_database(): | |
try: | |
reset_database() | |
return {"status": "success"} | |
except Exception as e: | |
return {"status": "error", "message": str(e)} | |
@app.post("/process_input") | |
async def process_input(req: BookingRequest): | |
try: | |
state = load_conversation_state(req.conversation_id) | |
user_email = req.user_email or state.get("user_email") | |
# Preserve the entire conversation context | |
state.update( | |
{ | |
"user_input": req.user_input, | |
"conversation_id": req.conversation_id, | |
"user_email": user_email, | |
"intent": state.get("intent", ""), | |
"date": state.get("date"), | |
"time_range": state.get("time_range"), | |
"specific_time": state.get("specific_time"), | |
"available_slots": state.get("available_slots", []), | |
"confirmed_slot": state.get("confirmed_slot", ""), | |
"messages": state.get("messages", []) | |
+ [{"role": "user", "content": req.user_input}], | |
"last_intent": state.get("last_intent", ""), | |
"needs_more_info": state.get("needs_more_info", False), | |
"auth_required": state.get("auth_required", False), | |
"auth_url": state.get("auth_url", ""), | |
} | |
) | |
final_state = graph.invoke(state) | |
save_conversation_state(req.conversation_id, final_state) | |
assistant_msgs = [m for m in final_state.get("messages", []) if m["role"] == "assistant"] | |
reply = assistant_msgs[-1]["content"] if assistant_msgs else "I'm here to help!" | |
return {"response": reply, "state": final_state} | |
except Exception as e: | |
logger.exception("process_input failed") | |
return {"response": f"Error: {e}", "state": {}} | |
@app.post("/complete_auth") | |
async def complete_auth(user_email: str, auth_code: str): | |
return { | |
"status": "success" if complete_oauth_flow(user_email, auth_code) else "error" | |
} | |
@app.get("/callback") | |
async def oauth_callback(code: str): | |
# Identify pending user | |
user_email = next((e for e, d in user_credentials.items() if d.get("status") == "pending_auth"), None) | |
if not user_email: | |
return RedirectResponse("http://localhost:8501?error=no_pending_auth") | |
flow = user_credentials[user_email]["flow"] | |
flow.fetch_token(code=code) | |
store_user_credentials(user_email, flow.credentials) | |
user_credentials[user_email]["status"] = "completed" | |
return RedirectResponse("http://localhost:8501?auth=success") | |
@app.get("/check_auth") | |
async def check_auth(user_email: str, browser_session_id: str = ""): | |
"""Check authentication status for a user in a specific browser session - SECURE""" | |
try: | |
# SECURITY: Check if this user is associated with this browser session | |
session_user = get_session_user(browser_session_id) if browser_session_id else None | |
# If user is not associated with this browser session, require fresh auth | |
if browser_session_id and session_user != user_email: | |
# Generate fresh auth URL | |
auth_url = generate_auth_url(user_email) | |
if browser_session_id: | |
browser_sessions[browser_session_id] = { | |
"user_email": user_email, | |
"status": "pending_auth", | |
"auth_url": auth_url | |
} | |
return { | |
"authorized": False, | |
"status": "pending_auth", | |
"auth_url": auth_url | |
} | |
# Check if user has valid credentials | |
creds = load_user_credentials(user_email) | |
if creds: | |
try: | |
service = get_calendar_service(user_email) | |
# Store valid session mapping | |
if browser_session_id: | |
store_browser_session(browser_session_id, user_email) | |
return {"authorized": True, "status": "completed", "auth_url": ""} | |
except Exception: | |
# Credentials exist but are invalid | |
delete_user_credentials(user_email) | |
if browser_session_id: | |
delete_browser_session(browser_session_id, user_email) | |
# Generate new auth URL | |
auth_url = generate_auth_url(user_email) | |
if browser_session_id: | |
browser_sessions[browser_session_id] = { | |
"user_email": user_email, | |
"status": "pending_auth", | |
"auth_url": auth_url | |
} | |
return { | |
"authorized": False, | |
"status": "pending_auth", | |
"auth_url": auth_url | |
} | |
except Exception as e: | |
logger.exception(f"Error in check_auth for {user_email}") | |
return { | |
"authorized": False, | |
"status": "error", | |
"auth_url": "", | |
"error": str(e) | |
} | |
@app.post("/logout") | |
async def logout(request: dict): | |
"""Logout user from specific browser session""" | |
try: | |
user_email = request.get("user_email") | |
browser_session_id = request.get("browser_session_id") | |
if browser_session_id: | |
# Remove only this browser session | |
delete_browser_session(browser_session_id, user_email) | |
browser_sessions.pop(browser_session_id, None) | |
else: | |
# Fallback to original behavior - remove all user credentials | |
delete_user_credentials(user_email) | |
user_credentials.pop(user_email, None) | |
return {"status": "success"} | |
except Exception as e: | |
logger.exception(f"Error in logout") | |
return {"status": "error", "message": str(e)} | |
def run_fastapi(): | |
uvicorn.run(app, host="127.0.0.1", port=8080, log_level="info") | |
def _periodic_cleanup(): | |
while True: | |
time.sleep(300) # 5 minutes | |
cleanup_expired_credentials() | |
cleanup_old_sessions() # Clean old browser sessions | |
threading.Thread(target=_periodic_cleanup, daemon=True).start() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import uuid | |
import requests | |
import streamlit as st | |
import urllib.parse | |
import hashlib | |
import json | |
from datetime import datetime, timedelta | |
import streamlit.components.v1 as components | |
BACKEND = "http://127.0.0.1:8080" | |
# --------------------------------------------------------------------------- | |
# Cookie Management Functions | |
# --------------------------------------------------------------------------- | |
def set_browser_cookie(key: str, value: str, days: int = 30): | |
"""Set a cookie in the browser""" | |
cookie_script = f""" | |
<script> | |
function setCookie(name, value, days) {{ | |
var expires = ""; | |
if (days) {{ | |
var date = new Date(); | |
date.setTime(date.getTime() + (days*24*60*60*1000)); | |
expires = "; expires=" + date.toUTCString(); | |
}} | |
document.cookie = name + "=" + (value || "") + expires + "; path=/"; | |
}} | |
setCookie("{key}", "{value}", {days}); | |
</script> | |
""" | |
components.html(cookie_script, height=0) | |
def get_browser_cookie(key: str) -> str: | |
"""Get a cookie value from the browser""" | |
cookie_script = f""" | |
<script> | |
function getCookie(name) {{ | |
var nameEQ = name + "="; | |
var ca = document.cookie.split(';'); | |
for(var i=0;i < ca.length;i++) {{ | |
var c = ca[i]; | |
while (c.charAt(0)==' ') c = c.substring(1,c.length); | |
if (c.indexOf(nameEQ) == 0) return c.substring(nameEQ.length,c.length); | |
}} | |
return null; | |
}} | |
var cookieValue = getCookie("{key}"); | |
if (cookieValue) {{ | |
window.parent.postMessage({{ | |
type: 'cookie_value', | |
key: '{key}', | |
value: cookieValue | |
}}, '*'); | |
}} | |
</script> | |
""" | |
components.html(cookie_script, height=0) | |
def delete_browser_cookie(key: str): | |
"""Delete a cookie from the browser""" | |
cookie_script = f""" | |
<script> | |
function deleteCookie(name) {{ | |
document.cookie = name + '=; expires=Thu, 01 Jan 1970 00:00:01 GMT; path=/'; | |
}} | |
deleteCookie("{key}"); | |
</script> | |
""" | |
components.html(cookie_script, height=0) | |
def generate_browser_session_id(): | |
"""Generate a unique session ID for this browser""" | |
import random | |
import string | |
return ''.join(random.choices(string.ascii_letters + string.digits, k=32)) | |
# --------------------------------------------------------------------------- | |
# Simplified Authentication Functions | |
# --------------------------------------------------------------------------- | |
def get_authenticated_users_for_browser(): | |
"""Get list of users authenticated in this specific browser session ONLY""" | |
if 'browser_session_id' not in st.session_state: | |
return [] | |
try: | |
response = requests.get( | |
f"{BACKEND}/authenticated_users", | |
params={"browser_session_id": st.session_state.browser_session_id}, | |
timeout=5 | |
) | |
if response.status_code == 200: | |
data = response.json() | |
# SECURITY: Only return users for THIS browser session | |
return data.get("browser_authenticated_users", []) | |
except Exception as e: | |
st.error(f"Error checking authenticated users: {e}") | |
return [] | |
def validate_user_with_backend(user_email: str) -> dict: | |
"""Validate user authentication with backend and get auth status""" | |
try: | |
params = { | |
"user_email": user_email, | |
"browser_session_id": st.session_state.get('browser_session_id', '') | |
} | |
response = requests.get( | |
f"{BACKEND}/check_auth", | |
params=params, | |
timeout=5 | |
) | |
if response.status_code == 200: | |
result = response.json() | |
return { | |
'authorized': result.get('authorized', False), | |
'status': result.get('status', 'unknown'), | |
'auth_url': result.get('auth_url', '') | |
} | |
except Exception as e: | |
st.error(f"Error validating user: {e}") | |
return {'authorized': False, 'status': 'error', 'auth_url': ''} | |
def initiate_auth_for_user(user_email: str) -> dict: | |
"""Initiate authentication flow for a user""" | |
try: | |
data = { | |
"user_email": user_email, | |
"browser_session_id": st.session_state.get('browser_session_id', '') | |
} | |
response = requests.post( | |
f"{BACKEND}/initiate_auth", | |
json=data, | |
timeout=5 | |
) | |
if response.status_code == 200: | |
result = response.json() | |
return { | |
'success': True, | |
'auth_url': result.get('auth_url', ''), | |
'status': result.get('status', 'pending_auth') | |
} | |
except Exception as e: | |
st.error(f"Error initiating auth: {e}") | |
return {'success': False, 'auth_url': '', 'status': 'error'} | |
def initialize_browser_session(): | |
"""Initialize browser session with cookie management""" | |
# Generate or retrieve browser session ID | |
if 'browser_session_id' not in st.session_state: | |
# Try to get from cookie first | |
if 'cookie_checked' not in st.session_state: | |
st.session_state.cookie_checked = True | |
# This will trigger the cookie retrieval | |
get_browser_cookie('appointment_session_id') | |
# Wait a bit for cookie retrieval (in real implementation, you'd handle this differently) | |
time.sleep(0.1) | |
# If no cookie found, generate new session ID | |
if 'browser_session_id' not in st.session_state: | |
st.session_state.browser_session_id = generate_browser_session_id() | |
# Set cookie for this browser | |
set_browser_cookie('appointment_session_id', st.session_state.browser_session_id) | |
# Check if user was previously authenticated in this browser | |
if st.session_state.get('browser_session_id') and not st.session_state.get('user_email'): | |
try: | |
response = requests.get( | |
f"{BACKEND}/get_session_user", | |
params={"browser_session_id": st.session_state.browser_session_id}, | |
timeout=5 | |
) | |
if response.status_code == 200: | |
result = response.json() | |
if result.get('user_email'): | |
st.session_state.user_email = result['user_email'] | |
st.session_state.auth_required = not result.get('authorized', False) | |
except Exception: | |
pass | |
def get_authenticated_users(): | |
"""Get list of users who have valid authentication from backend""" | |
try: | |
response = requests.get(f"{BACKEND}/authenticated_users", timeout=5) | |
if response.status_code == 200: | |
data = response.json() | |
return data.get("authenticated_users", []) | |
except Exception as e: | |
st.error(f"Error checking authenticated users: {e}") | |
return [] | |
def check_user_authentication(email): | |
"""Check if a specific user has valid authentication""" | |
try: | |
response = requests.get(f"{BACKEND}/user_status/{email}", timeout=5) | |
if response.status_code == 200: | |
result = response.json() | |
return result.get('authenticated', False) | |
except Exception: | |
pass | |
return False | |
def handle_streamlit_auth() -> bool: | |
"""Handle the authentication flow""" | |
if st.session_state.get("auth_required") and st.session_state.get("auth_url"): | |
st.warning("π Google Calendar authorization required") | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
st.markdown(f""" | |
**Step 1:** [Click here to authorize Google Calendar access]({st.session_state.auth_url}) | |
**Step 2:** Complete the authorization in the new tab | |
**Step 3:** Return to this page and click "Check Authorization" | |
""") | |
with col2: | |
if st.button("π Check Authorization", key="manual_check"): | |
try: | |
auth_result = validate_user_with_backend(st.session_state.user_email) | |
if auth_result['authorized']: | |
st.success("β Authorization completed!") | |
st.session_state.auth_required = False | |
st.session_state.auth_url = "" | |
st.rerun() | |
else: | |
st.info(f"Status: {auth_result.get('status', 'pending')}") | |
st.info("Please complete the authorization in the other tab first.") | |
except Exception as e: | |
st.error(f"Error checking auth status: {e}") | |
return True | |
return False | |
def render_login_interface(): | |
"""Render the login interface in sidebar - SECURE VERSION""" | |
st.subheader("π Account Login") | |
# Get authenticated users for THIS specific browser only | |
authenticated_users = get_authenticated_users_for_browser() | |
if authenticated_users: | |
# Show quick login for users authenticated in THIS browser | |
st.markdown("**Your authenticated accounts:**") | |
for i, user in enumerate(authenticated_users): | |
if st.button(f"π§ {user}", key=f"quick_login_{i}", use_container_width=True): | |
st.session_state.user_email = user | |
# Verify authentication for this specific browser session | |
auth_result = validate_user_with_backend(user) | |
if auth_result['authorized']: | |
st.session_state.auth_required = False | |
st.success(f"Logged in as {user}") | |
st.rerun() | |
else: | |
st.session_state.auth_required = True | |
st.session_state.auth_url = auth_result.get('auth_url', '') | |
if not st.session_state.auth_url: | |
auth_init = initiate_auth_for_user(user) | |
st.session_state.auth_url = auth_init.get('auth_url', '') | |
st.warning("Re-authentication required") | |
st.rerun() | |
st.markdown("---") | |
# Always show option to add new account | |
st.markdown("**Add new account:**") | |
new_email = st.text_input( | |
"Email address:", | |
placeholder="your.email@gmail.com", | |
key="new_email_input", | |
label_visibility="collapsed" | |
) | |
if st.button("Continue with Email", disabled=not new_email, use_container_width=True): | |
if new_email and "@" in new_email: | |
st.session_state.user_email = new_email | |
# Always require fresh authentication for new emails | |
st.session_state.auth_required = True | |
auth_init = initiate_auth_for_user(new_email) | |
st.session_state.auth_url = auth_init.get('auth_url', '') | |
st.info("Authentication required for this account") | |
st.rerun() | |
else: | |
st.error("Please enter a valid email address") | |
# --------------------------------------------------------------------------- | |
# Main Streamlit Application | |
# --------------------------------------------------------------------------- | |
def main(): | |
st.set_page_config(page_title="AI Appointment Assistant", page_icon="π ") | |
# Initialize browser session first | |
initialize_browser_session() | |
# Initialize session state | |
if "conversation_id" not in st.session_state: | |
st.session_state.conversation_id = str(uuid.uuid4()) | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "user_email" not in st.session_state: | |
st.session_state.user_email = None | |
if "auth_required" not in st.session_state: | |
st.session_state.auth_required = False | |
if "auth_url" not in st.session_state: | |
st.session_state.auth_url = "" | |
# Handle URL parameters for auth success/error | |
try: | |
if hasattr(st, "query_params"): | |
qp = st.query_params | |
if "auth" in qp and qp["auth"] == "success": | |
st.success("β Authorization successful! You can now book appointments.") | |
st.session_state.auth_required = False | |
st.session_state.auth_url = "" | |
qp.clear() | |
st.rerun() | |
elif "error" in qp: | |
st.error(f"β Authorization failed: {qp['error']}") | |
st.session_state.auth_required = False | |
st.session_state.auth_url = "" | |
qp.clear() | |
except Exception: | |
pass | |
st.title("π AI Appointment Assistant") | |
# Sidebar for user authentication | |
with st.sidebar: | |
if st.session_state.user_email: | |
# User is logged in - show status and logout | |
st.success(f"**Logged in as:**\n{st.session_state.user_email}") | |
st.info(f"**Browser Session:** {st.session_state.browser_session_id[:8]}...") | |
if st.session_state.auth_required: | |
st.warning("β οΈ Authorization required") | |
else: | |
st.success("β Ready to help!") | |
# Account actions | |
st.markdown("---") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("π Refresh", key="refresh_auth", use_container_width=True): | |
auth_result = validate_user_with_backend(st.session_state.user_email) | |
if auth_result['authorized']: | |
st.session_state.auth_required = False | |
st.session_state.auth_url = "" | |
st.success("β Refreshed!") | |
else: | |
st.session_state.auth_required = True | |
st.session_state.auth_url = auth_result.get('auth_url', '') | |
st.warning("Re-auth needed") | |
st.rerun() | |
with col2: | |
if st.button("π Logout", key="logout_btn", use_container_width=True): | |
try: | |
# Logout from this browser session | |
response = requests.post( | |
f"{BACKEND}/logout", | |
json={ | |
"user_email": st.session_state.user_email, | |
"browser_session_id": st.session_state.browser_session_id | |
}, | |
timeout=5, | |
) | |
except Exception as e: | |
st.error(f"Logout error: {e}") | |
# Clear cookies | |
delete_browser_cookie('appointment_session_id') | |
# Clear session state | |
st.session_state.user_email = None | |
st.session_state.auth_required = False | |
st.session_state.auth_url = "" | |
st.session_state.messages = [] | |
st.session_state.conversation_id = str(uuid.uuid4()) | |
# Generate new browser session ID | |
st.session_state.browser_session_id = generate_browser_session_id() | |
set_browser_cookie('appointment_session_id', st.session_state.browser_session_id) | |
st.success("π Logged out!") | |
st.rerun() | |
else: | |
# User not logged in - show login interface | |
render_login_interface() | |
# Handle authentication flow | |
if handle_streamlit_auth(): | |
return | |
# Show appropriate content based on login status | |
if not st.session_state.user_email: | |
# Welcome screen for new users | |
st.markdown(""" | |
### Welcome! π | |
I'm your AI appointment assistant. I can help you: | |
- π **Book appointments** - Just say "Book a meeting tomorrow at 3 PM" | |
- π **Check availability** - Ask "Am I free Friday afternoon?" | |
- π **View your schedule** - Say "Show my appointments" | |
**To get started, please log in using the sidebar** π | |
""") | |
# Add some example interactions | |
with st.expander("π‘ See example conversations"): | |
st.markdown(""" | |
**Booking an appointment:** | |
- "Book a meeting tomorrow at 2 PM" | |
- "Schedule a call with John next Friday from 10-11 AM" | |
**Checking availability:** | |
- "Am I free this Thursday afternoon?" | |
- "What's my availability next week?" | |
**Viewing schedule:** | |
- "Show my appointments" | |
- "What meetings do I have today?" | |
""") | |
return | |
# Main chat interface - only show if user is selected | |
if st.session_state.user_email and not st.session_state.auth_required: | |
# Display chat history | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Chat input | |
if user_input := st.chat_input("Ask me to book an appointment or check availability..."): | |
# Add user message to chat | |
st.session_state.messages.append({"role": "user", "content": user_input}) | |
with st.chat_message("user"): | |
st.markdown(user_input) | |
# Process user input | |
try: | |
with st.spinner("Thinking..."): | |
response = requests.post( | |
f"{BACKEND}/process_input", | |
json={ | |
"user_input": user_input, | |
"conversation_id": st.session_state.conversation_id, | |
"user_email": st.session_state.user_email, | |
}, | |
timeout=30, | |
) | |
if response.status_code == 200: | |
result = response.json() | |
backend_state = result.get("state", {}) | |
# Update auth state from backend | |
st.session_state.auth_required = backend_state.get("auth_required", False) | |
st.session_state.auth_url = backend_state.get("auth_url", "") | |
if not st.session_state.auth_required: | |
# Show assistant response | |
with st.chat_message("assistant"): | |
st.markdown(result["response"]) | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": result["response"] | |
}) | |
else: | |
# Need to re-authenticate | |
st.rerun() | |
else: | |
error_msg = "Sorry, I'm having trouble processing your request. Please try again." | |
with st.chat_message("assistant"): | |
st.markdown(error_msg) | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": error_msg | |
}) | |
except requests.exceptions.RequestException: | |
error_msg = "Connection error. Please make sure the server is running and try again." | |
with st.chat_message("assistant"): | |
st.markdown(error_msg) | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": error_msg | |
}) | |
except Exception as e: | |
error_msg = f"An unexpected error occurred: {str(e)}" | |
with st.chat_message("assistant"): | |
st.markdown(error_msg) | |
st.session_state.messages.append({ | |
"role": "assistant", | |
"content": error_msg | |
}) | |
# Show welcome message for authenticated users with empty chat | |
elif st.session_state.user_email and not st.session_state.auth_required and not st.session_state.messages: | |
st.success(f"β Ready to help! Authenticated as: {st.session_state.user_email}") | |
st.markdown(""" | |
### What would you like to do? | |
You can ask me to: | |
- Book a new appointment | |
- Check your availability | |
- Show your upcoming meetings | |
Just type your request below! π | |
""") | |
# Debug info in sidebar (optional - remove in production) | |
with st.sidebar: | |
if st.checkbox("π§ Debug Info", key="debug_toggle"): | |
st.markdown("---") | |
st.subheader("Debug Information") | |
st.json({ | |
"user_email": st.session_state.user_email, | |
"auth_required": st.session_state.auth_required, | |
"has_auth_url": bool(st.session_state.auth_url), | |
"conversation_id": st.session_state.conversation_id, | |
"message_count": len(st.session_state.messages) | |
}) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment