Skip to content

Instantly share code, notes, and snippets.

@HrishikeshChaudhari24
Created June 27, 2025 12:48
Show Gist options
  • Save HrishikeshChaudhari24/a712e7c11b4e178a2bf17221bfebb06c to your computer and use it in GitHub Desktop.
Save HrishikeshChaudhari24/a712e7c11b4e178a2bf17221bfebb06c to your computer and use it in GitHub Desktop.
import json
import logging
import threading
import time
import uuid
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional,Set
import uvicorn
from fastapi import FastAPI
from fastapi.responses import RedirectResponse
from langgraph.graph import END, StateGraph
from pydantic import BaseModel
import sqlite3
from agent.models.database import (
cleanup_expired_credentials,
delete_user_credentials,
load_conversation_state,
load_user_credentials,
reset_database,
save_conversation_state,
store_user_credentials,
check_database_health,
user_credentials,
)
from agent.models.calendar import (
book_appointment,
check_availability,
complete_oauth_flow,
generate_auth_url,
get_calendar_service,
get_user_appointments,
get_user_timezone,
parse_time_range,
init_global_appointments_db,
check_global_slot_availability,
get_available_slots_for_date,
book_global_appointment,
get_all_global_appointments,
get_conflicting_appointments
)
init_global_appointments_db()
import google.generativeai as genai
from dateutil import parser
from groq import Groq
GROQ_API_KEY = "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"
GEMINI_MODELS = [
"gemini-2.0-flash",
"gemini-1.5-flash",
]
GEMINI_API_KEY = "AIzaSyCcd9zzBsgQ6j0gNQ_4TvA59xpHNsLQ7uQ"
genai.configure(api_key=GEMINI_API_KEY)
logger = logging.getLogger(__name__)
app = FastAPI()
def call_llama_groq(user_input: str) -> dict:
client = Groq(api_key=GROQ_API_KEY)
try:
chat_completion = client.chat.completions.create(
messages=[{"role": "user", "content": user_input}],
model="llama-3.3-70b-versatile",
stream=False,
)
content = chat_completion.choices[0].message.content
return {"intent": "unknown", "response": content}
except Exception as e:
return {"intent": "unknown", "error": f"Llama fallback failed: {e}"}
def fallback_parse_input(user_input: str, today: str) -> dict:
"""Simple fallback parser when Gemini fails"""
user_input_lower = user_input.lower()
# Basic intent detection
if any(word in user_input_lower for word in ["show", "my appointments", "meetings", "schedule"]):
intent = "show_appointments"
elif any(word in user_input_lower for word in ["book", "schedule", "appointment"]):
intent = "book_appointment"
elif any(word in user_input_lower for word in ["check", "availability", "free"]):
intent = "check_availability"
else:
intent = "unknown"
# Basic date parsing
date = None
if "today" in user_input_lower:
date = today
elif "tomorrow" in user_input_lower:
tomorrow = datetime.now() + timedelta(days=1)
date = tomorrow.strftime("%Y-%m-%d")
# Basic time parsing
specific_time = None
import re
# Look for time patterns like "11 pm", "11:00 pm", "23:00"
time_patterns = [
r'(\d{1,2}):?(\d{0,2})\s*(pm|am)',
r'(\d{1,2}):(\d{2})'
]
for pattern in time_patterns:
match = re.search(pattern, user_input_lower)
if match:
hour = int(match.group(1))
minute = int(match.group(2)) if match.group(2) else 0
if len(match.groups()) >= 3 and match.group(3): # Has AM/PM
ampm = match.group(3).upper()
specific_time = f"{hour:02d}:{minute:02d} {ampm}"
else: # 24-hour format
if hour > 12:
ampm = "PM"
hour = hour - 12 if hour > 12 else hour
else:
ampm = "AM" if hour < 12 else "PM"
specific_time = f"{hour:02d}:{minute:02d} {ampm}"
break
return {
"intent": intent,
"date": date,
"time_range": None,
"specific_time": specific_time,
"email": None
}
def call_gemini(user_input: str, context: Dict[str, Any] | None = None, retries: int = 3):
logger.info("Calling Gemini API with input: %s", user_input)
# Get current date and time in user's timezone
from datetime import datetime
import pytz
# Assume Indian timezone for now, but this should come from user settings
user_tz = pytz.timezone('Asia/Kolkata')
now = datetime.now(user_tz)
today = now.strftime("%Y-%m-%d")
current_time = now.strftime("%I:%M %p")
context_str = (
f"Previous conversation context: Intent={context['last_intent']}, Date={context.get('date')}, Time={context.get('time_range')}"
if context and context.get("last_intent")
else ""
)
appointment_intents = [
"show my appointments",
"my meetings",
"my calendar",
"my schedule",
"upcoming appointments",
"what meetings do i have",
"show appointments",
"view my schedule"
]
prompt = f"""
You are an appointment booking assistant. Analyze the user input and return a JSON object.
CURRENT CONTEXT:
- Current date: {today}
- Current time: {current_time}
- User timezone: Asia/Kolkata
{context_str}
IMPORTANT DATE/TIME PARSING RULES:
1. "today" = {today}
2. "tomorrow" = {(now + timedelta(days=1)).strftime("%Y-%m-%d")}
3. When user says "11 pm" or "11:00 pm", convert to "11:00 PM" format
4. When user says "23:00", convert to "11:00 PM" format
5. DO NOT convert times to UTC - keep them in user's local timezone
6. For booking appointments, if only a specific time is given (like "11 pm"), create a 30-minute slot
User input: "{user_input}"
Appointment-related intents: {appointment_intents}
If the user's message matches any of these, set "intent": "show_appointments".
Return JSON with:
- "intent": One of ["show_appointments", "book_appointment", "check_availability", "greeting", "unknown"]
- "date": Date in YYYY-MM-DD format (e.g., "{today}" for today) or null
- "time_range": {{"start": "HH:MM AM/PM", "end": "HH:MM AM/PM"}} or null
- "specific_time": Single time slot in 12-hour format (e.g., "11:00 PM") or null
- "email": Email address if mentioned or null
EXAMPLES:
Input: "book appointment today 11 pm"
Output: {{
"intent": "book_appointment",
"date": "{today}",
"time_range": null,
"specific_time": "11:00 PM",
"email": null
}}
Input: "book meeting tomorrow 2-4 PM"
Output: {{
"intent": "book_appointment",
"date": "{(now + timedelta(days=1)).strftime("%Y-%m-%d")}",
"time_range": {{"start": "2:00 PM", "end": "4:00 PM"}},
"specific_time": null,
"email": null
}}
Input: "show my appointments"
Output: {{
"intent": "show_appointments",
"date": null,
"time_range": null,
"specific_time": null,
"email": null
}}
"""
for model_name in GEMINI_MODELS:
try:
model = genai.GenerativeModel(model_name)
for attempt in range(retries):
try:
response = model.generate_content(prompt)
if not response or not response.text:
continue
text = response.text.strip()
if text.startswith("```json"):
text = text.replace("```json", "").replace("```", "").strip()
result = json.loads(text)
logger.info(f"Gemini parsed result: {result}")
return result
except Exception as e:
if "429" in str(e) and attempt < retries - 1:
time.sleep(2 ** attempt)
continue
logger.warning("Gemini error (%s): %s", model_name, e)
break
except Exception as e:
logger.warning("Model %s unavailable: %s", model_name, e)
# Fallback to simple parsing if Gemini fails
logger.warning("Gemini failed, using fallback parsing")
return fallback_parse_input(user_input, today)
class BookingRequest(BaseModel):
user_input: str
conversation_id: str
user_email: Optional[str] = None
from typing_extensions import TypedDict
class AgentState(TypedDict, total=False):
user_input: str
conversation_id: str
user_email: str
intent: str
date: Optional[str]
time_range: Optional[Dict[str, str]]
specific_time: Optional[str]
available_slots: List[str]
confirmed_slot: str
messages: List[Dict[str, str]]
last_intent: str
needs_more_info: bool
auth_required: bool
auth_url: str
def email_collection_node(state: Dict[str, Any]):
"""Check if user email is provided and handle authentication - ALWAYS runs first"""
state['messages'] = state.get('messages', [])
# ALWAYS check for authentication, regardless of intent
if not state.get('user_email'):
# Try to extract email from user input
import re
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
email_matches = re.findall(email_pattern, state['user_input'])
if email_matches:
state['user_email'] = email_matches[0]
logger.info(f"Email extracted: {state['user_email']}")
else:
# Try Gemini to extract email
gemini_result = call_gemini(state['user_input'])
if gemini_result.get('email'):
state['user_email'] = gemini_result['email']
logger.info(f"Email extracted via Gemini: {state['user_email']}")
else:
# Ask for email - even for simple greetings
greeting_response = "Hi! I'm your AI appointment booking assistant. "
if "hi" in state['user_input'].lower() or "hello" in state['user_input'].lower():
greeting_response += "Nice to meet you! "
greeting_response += "To access your calendar and help with appointments, please provide your email address."
state['messages'].append({
'role': 'assistant',
'content': greeting_response
})
state['needs_more_info'] = True
return state
# Check if user has valid cached credentials
if state.get('user_email'):
creds_json = load_user_credentials(state['user_email'])
if creds_json:
# User has valid cached credentials
try:
service = get_calendar_service(state['user_email'])
state['needs_more_info'] = False
state['auth_required'] = False
logger.info(f"Using cached credentials for {state['user_email']}")
return state
except Exception as e:
logger.error(f"Cached credentials failed for {state['user_email']}: {e}")
delete_user_credentials(state['user_email'])
state['auth_required'] = True
state['needs_more_info'] = False
# Generate auth URL immediately
try:
state['auth_url'] = generate_auth_url(state['user_email'])
except Exception as auth_e:
logger.error(f"Failed to generate auth URL: {auth_e}")
else:
state['auth_required'] = True
state['needs_more_info'] = False
# Generate auth URL immediately
try:
state['auth_url'] = generate_auth_url(state['user_email'])
except Exception as auth_e:
logger.error(f"Failed to generate auth URL: {auth_e}")
logger.info(f"No cached credentials for {state['user_email']}, requiring auth")
return state
def intent_detection_node(state: Dict[str, Any]):
if state.get("needs_more_info") or state.get("auth_required"):
return state
ctx = {
"last_intent": state.get("last_intent", ""),
"date": state.get("date"),
"time_range": state.get("time_range"),
}
gemini = call_gemini(state["user_input"], ctx)
state["intent"] = gemini.get("intent", "unknown")
state["date"] = gemini.get("date") or state.get("date")
state["time_range"] = gemini.get("time_range") or state.get("time_range")
state["specific_time"] = gemini.get("specific_time")
state["last_intent"] = state["intent"]
return state
def validation_node(state: Dict[str, Any]):
"""Enhanced validation that handles greetings and authentication properly"""
# If we're waiting for OAuth or need more info (like email), skip validation
if state.get("auth_required") or state.get("needs_more_info"):
return state
state["messages"] = state.get("messages", [])
user_input_lower = state.get("user_input", "").lower()
# Handle greetings when user is authenticated
if any(greeting in user_input_lower for greeting in ["hi", "hello", "hey", "good morning", "good afternoon"]):
state["messages"].append({
"role": "assistant",
"content": f"Hello! I'm your appointment assistant. I can help you:\n\nβ€’ Book new appointments\nβ€’ Check your availability\nβ€’ Show your upcoming meetings\n\nWhat would you like to do today?"
})
state["needs_more_info"] = True # Wait for their next input
return state
if state.get("intent") == "show_appointments":
state["needs_more_info"] = False
elif state.get("intent") in ["book_appointment", "check_availability"]:
if not state.get("date"):
state["messages"].append({
"role": "assistant",
"content": "I'd be happy to help! What date would you like? You can say something like 'tomorrow', 'Friday', or a specific date.",
})
state["needs_more_info"] = True
elif not state.get("time_range") and not state.get("specific_time"):
state["messages"].append({
"role": "assistant",
"content": f"Great! For {state['date']}, what time works for you? You can specify a range like '2-4 PM' or a specific time like '3 PM'.",
})
state["needs_more_info"] = True
else:
# Convert specific_time to time_range if needed (existing logic)
if state.get('specific_time') and not state.get('time_range'):
try:
time_str = state['specific_time']
# Handle 24-hour format
if ':' in time_str and 'AM' not in time_str.upper() and 'PM' not in time_str.upper():
# Convert 24-hour to 12-hour format
try:
dt = datetime.strptime(time_str, '%H:%M')
time_str = dt.strftime('%I:%M %p')
except ValueError:
raise ValueError("Invalid 24-hour time format")
elif ':' not in time_str and ('AM' in time_str.upper() or 'PM' in time_str.upper()):
# Handle formats like "8 PM"
time_str = time_str.replace(' ', ':00 ')
specific_dt = datetime.strptime(time_str, '%I:%M %p')
end_dt = specific_dt + timedelta(minutes=30)
state['time_range'] = {
'start': specific_dt.strftime('%I:%M %p'),
'end': end_dt.strftime('%I:%M %p')
}
logger.info("Converted specific time %s to range %s", time_str, state['time_range'])
except Exception as e:
logger.error("Time conversion error: %s", e)
state["messages"].append({
"role": "assistant",
"content": "Sorry, I couldn't understand the time format. Please specify like '8:00 PM', '8 PM', or '20:00'.",
})
state["needs_more_info"] = True
return state
state["needs_more_info"] = False
else:
# Unknown intent - provide helpful guidance
state["messages"].append({
"role": "assistant",
"content": "I can help you with appointments! Try saying something like:\n\nβ€’ 'Book a meeting tomorrow at 3 PM'\nβ€’ 'Check my availability Friday afternoon'\nβ€’ 'Show my upcoming appointments'\n\nWhat would you like to do?",
})
state["needs_more_info"] = True
return state
def availability_node(state: Dict[str, Any]):
"""Enhanced availability node with global slot checking and negotiation"""
if state.get("needs_more_info") or state.get("auth_required"):
return state
state.setdefault("messages", [])
if state.get("intent") in ["book_appointment", "check_availability"] and state.get("date") and state.get("time_range"):
date = state["date"]
time_range = state["time_range"]
start_time = time_range["start"]
end_time = time_range["end"]
# Check if the specific requested slot is available
if check_global_slot_availability(date, start_time, end_time):
state["available_slots"] = [f"{start_time} - {end_time}"]
state["messages"].append({
"role": "assistant",
"content": f"Great! The slot {start_time} - {end_time} on {date} is available. Would you like me to book it for you?",
})
else:
# Slot is not available - show conflicts and alternatives
conflicts = get_conflicting_appointments(date, start_time, end_time)
conflict_msg = f"Sorry, the slot {start_time} - {end_time} on {date} is already booked"
if conflicts:
conflict_msg += f" (booked by other user)"
# Get alternative slots for the same day
available_alternatives = get_available_slots_for_date(date)
if available_alternatives:
# Limit to first 5 alternatives for better UX
alt_slots = available_alternatives[:5]
state["available_slots"] = alt_slots
alternatives_text = "\n".join([f"β€’ {slot}" for slot in alt_slots])
state["messages"].append({
"role": "assistant",
"content": f"{conflict_msg}.\n\nHere are some available alternatives for {date}:\n\n{alternatives_text}\n\nWhich slot would you prefer? Just say the time (e.g., '2:00 PM - 2:30 PM')."
})
state["needs_more_info"] = True # Wait for user to choose alternative
else:
state["messages"].append({
"role": "assistant",
"content": f"{conflict_msg}.\n\nUnfortunately, there are no other available slots on {date}. Would you like to try a different date?"
})
state["needs_more_info"] = True
state["available_slots"] = []
elif state.get("intent") == "check_availability" and state.get("date"):
# Just checking availability, not booking
date = state["date"]
available_slots = get_available_slots_for_date(date)
if available_slots:
slots_text = "\n".join([f"β€’ {slot}" for slot in available_slots[:10]]) # Show max 10 slots
state["messages"].append({
"role": "assistant",
"content": f"Here are the available slots for {date}:\n\n{slots_text}\n\nWould you like to book any of these slots?"
})
state["available_slots"] = available_slots
else:
state["messages"].append({
"role": "assistant",
"content": f"Sorry, there are no available slots on {date}. Would you like to check a different date?"
})
state["available_slots"] = []
return state
def booking_node(state: Dict[str, Any]):
"""Enhanced booking node using global appointment system"""
if state.get("needs_more_info") or state.get("auth_required"):
return state
state.setdefault("messages", [])
if state.get("intent") == "book_appointment":
user_input_lower = state.get("user_input", "").lower()
# Check if user is selecting from available slots
if state.get("available_slots"):
selected_slot = None
# Try to match user input with available slots
for slot in state["available_slots"]:
# Extract time from slot format "HH:MM AM/PM - HH:MM AM/PM"
slot_times = slot.split(" - ")
if len(slot_times) == 2:
start_time = slot_times[0].strip()
end_time = slot_times[1].strip()
# Check if user mentioned this time
if (start_time.lower() in user_input_lower or
slot.lower() in user_input_lower or
start_time.replace(":00", "").lower() in user_input_lower):
selected_slot = slot
break
# If no specific slot mentioned, use the first available
if not selected_slot and state["available_slots"]:
selected_slot = state["available_slots"][0]
if selected_slot:
# Parse the selected slot
slot_parts = selected_slot.split(" - ")
if len(slot_parts) == 2:
start_time = slot_parts[0].strip()
end_time = slot_parts[1].strip()
date = state["date"]
user_email = state["user_email"]
# Book the appointment globally
result = book_global_appointment(user_email, date, start_time, end_time)
if result["status"] == "confirmed":
state["messages"].append({
"role": "assistant",
"content": f"βœ… Perfect! Your appointment is booked for {date} from {start_time} to {end_time}. Your appointment ID is #{result['appointment_id']}. Is there anything else I can help you with?"
})
# Clear the slots since booking is complete
state["available_slots"] = []
state["needs_more_info"] = False
elif result["status"] == "unavailable":
state["messages"].append({
"role": "assistant",
"content": f"Sorry, that slot was just booked by someone else! Let me check for other available slots..."
})
# Refresh available slots
new_slots = get_available_slots_for_date(date)
state["available_slots"] = new_slots[:5]
if new_slots:
alternatives_text = "\n".join([f"β€’ {slot}" for slot in new_slots[:5]])
state["messages"].append({
"role": "assistant",
"content": f"Here are the updated available slots:\n\n{alternatives_text}\n\nWhich one would you like?"
})
state["needs_more_info"] = True
else:
state["messages"].append({
"role": "assistant",
"content": f"Sorry, I encountered an error while booking: {result.get('error', 'Unknown error')}. Please try again."
})
state["needs_more_info"] = True
else:
state["messages"].append({
"role": "assistant",
"content": "I couldn't parse the selected time slot. Please specify the time again."
})
state["needs_more_info"] = True
else:
state["messages"].append({
"role": "assistant",
"content": "I couldn't understand which slot you'd like. Please specify the exact time (e.g., '2:00 PM - 2:30 PM') from the available options."
})
state["needs_more_info"] = True
else:
# No available slots, ask for different time/date
state["messages"].append({
"role": "assistant",
"content": "I don't have any available slots to book. Would you like to try a different date or time?"
})
state["needs_more_info"] = True
return state
def show_appointments_node(state: Dict[str, Any]):
"""Enhanced show appointments using global database"""
if state.get("needs_more_info") or state.get("auth_required"):
return state
state.setdefault("messages", [])
if state.get("intent") == "show_appointments" and state.get("user_email"):
appointments = get_all_global_appointments(state["user_email"])
if appointments:
# Format appointments for display
appt_list = []
for appt in appointments:
appt_list.append(f"β€’ **{appt['title']}** on {appt['date']} from {appt['start_time']} to {appt['end_time']} (ID: #{appt['id']})")
reply = "Here are your upcoming appointments:\n\n" + "\n".join(appt_list)
reply += "\n\nWould you like to book another appointment or make any changes?"
else:
reply = "You have no upcoming appointments. Would you like to book one?"
state["messages"].append({"role": "assistant", "content": reply})
return state
browser_sessions: Dict[str, Dict[str, any]] = {}
def init_browser_sessions_db():
"""Initialize browser sessions table in database"""
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
c.execute('''
CREATE TABLE IF NOT EXISTS browser_sessions (
session_id TEXT PRIMARY KEY,
user_email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def store_browser_session(session_id: str, user_email: str):
"""Store browser session in database"""
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
c.execute('''
INSERT OR REPLACE INTO browser_sessions (session_id, user_email, last_accessed)
VALUES (?, ?, CURRENT_TIMESTAMP)
''', (session_id, user_email))
conn.commit()
conn.close()
def get_session_user(session_id: str) -> str:
"""Get user email for a browser session"""
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
c.execute('SELECT user_email FROM browser_sessions WHERE session_id = ?', (session_id,))
result = c.fetchone()
conn.close()
return result[0] if result else None
def get_user_sessions(user_email: str) -> List[str]:
"""Get all browser sessions for a user"""
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
c.execute('SELECT session_id FROM browser_sessions WHERE user_email = ?', (user_email,))
results = c.fetchall()
conn.close()
return [r[0] for r in results]
def delete_browser_session(session_id: str, user_email: str = None):
"""Delete a specific browser session"""
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
if user_email:
c.execute('DELETE FROM browser_sessions WHERE session_id = ? AND user_email = ?',
(session_id, user_email))
else:
c.execute('DELETE FROM browser_sessions WHERE session_id = ?', (session_id,))
conn.commit()
conn.close()
def cleanup_old_sessions():
"""Clean up sessions older than 30 days"""
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
c.execute('''
DELETE FROM browser_sessions
WHERE last_accessed < datetime('now', '-30 days')
''')
conn.commit()
conn.close()
def create_workflow():
workflow = StateGraph(AgentState)
workflow.add_node("email_collection", email_collection_node)
workflow.add_node("intent_detection", intent_detection_node)
workflow.add_node("validation", validation_node)
workflow.add_node("availability", availability_node)
workflow.add_node("booking", booking_node)
workflow.add_node("show_appointments", show_appointments_node)
# ALWAYS start with email collection/authentication
workflow.set_entry_point("email_collection")
# From email collection, route based on authentication status
workflow.add_conditional_edges(
"email_collection",
lambda state: (
"end" if state.get('needs_more_info', False) or state.get('auth_required', False)
else "intent_detection" # Only proceed if authenticated and have email
),
{
"intent_detection": "intent_detection",
"end": END
}
)
workflow.add_edge("intent_detection", "validation")
# Updated conditional routing from validation
workflow.add_conditional_edges(
"validation",
lambda state: (
"end" if state.get('auth_required', False) or state.get('needs_more_info', True)
else "show_appointments" if state.get('intent') == 'show_appointments'
else "availability" if state.get('intent') in ['book_appointment', 'check_availability']
else "end"
),
{
"availability": "availability",
"show_appointments": "show_appointments",
"end": END
}
)
workflow.add_edge("availability", "booking")
workflow.add_edge("booking", END)
workflow.add_edge("show_appointments", END)
return workflow.compile()
graph = create_workflow()
@app.post("/store_session_user")
async def store_session_user(request: dict):
"""Store user-session mapping"""
try:
session_id = request.get("browser_session_id")
user_email = request.get("user_email")
if session_id and user_email:
store_browser_session(session_id, user_email)
return {"status": "success"}
else:
return {"status": "error", "message": "Missing session_id or user_email"}
except Exception as e:
logger.exception("Error storing session user")
return {"status": "error", "message": str(e)}
@app.get("/get_session_user")
async def get_session_user_endpoint(browser_session_id: str):
"""Get user for a browser session"""
try:
user_email = get_session_user(browser_session_id)
if user_email:
# Check if user is still authenticated
creds_json = load_user_credentials(user_email)
authorized = False
if creds_json:
try:
service = get_calendar_service(user_email)
authorized = True
except Exception:
delete_user_credentials(user_email)
return {
"user_email": user_email,
"authorized": authorized
}
else:
return {"user_email": None, "authorized": False}
except Exception as e:
logger.exception("Error getting session user")
return {"user_email": None, "authorized": False, "error": str(e)}
@app.post("/initiate_auth")
async def initiate_auth(request: dict):
"""Initiate authentication flow - SECURE VERSION"""
try:
user_email = request.get("user_email")
browser_session_id = request.get("browser_session_id")
if not user_email:
return {"status": "error", "message": "user_email is required"}
# SECURITY: Always generate fresh auth URL for new browser sessions
# Don't allow reusing credentials across browser sessions
auth_url = generate_auth_url(user_email)
# Store pending session
if browser_session_id:
browser_sessions[browser_session_id] = {
"user_email": user_email,
"status": "pending_auth",
"auth_url": auth_url
}
return {
"status": "pending_auth",
"auth_url": auth_url,
"message": f"Fresh auth required for {user_email}"
}
except Exception as e:
logger.exception(f"Error initiating auth")
return {
"status": "error",
"auth_url": "",
"message": f"Failed to initiate auth: {str(e)}"
}
@app.get("/authenticated_users")
async def get_authenticated_users(browser_session_id: str = None):
"""Get list of users who have valid authentication for this SPECIFIC browser session"""
try:
browser_authenticated_users = []
if browser_session_id:
# SECURITY: Only get users for THIS specific browser session
user_email = get_session_user(browser_session_id)
if user_email:
# Verify credentials are still valid
creds_json = load_user_credentials(user_email)
if creds_json:
try:
service = get_calendar_service(user_email)
browser_authenticated_users.append(user_email)
except Exception:
delete_user_credentials(user_email)
delete_browser_session(browser_session_id, user_email)
return {
"browser_authenticated_users": browser_authenticated_users,
"count": len(browser_authenticated_users)
}
except Exception as e:
logger.exception("Error getting authenticated users")
return {"error": str(e), "browser_authenticated_users": []}
@app.get("/user_status/{user_email}")
async def get_user_status(user_email: str):
"""Get detailed status for a specific user"""
try:
# Check if user has valid credentials
creds_json = load_user_credentials(user_email)
if not creds_json:
return {
"user_email": user_email,
"authenticated": False,
"status": "no_credentials"
}
# Try to get calendar service
try:
service = get_calendar_service(user_email)
return {
"user_email": user_email,
"authenticated": True,
"status": "valid_credentials",
"can_access_calendar": True
}
except Exception as e:
# Credentials exist but are invalid
delete_user_credentials(user_email)
return {
"user_email": user_email,
"authenticated": False,
"status": "invalid_credentials",
"error": str(e)
}
except Exception as e:
logger.exception(f"Error checking status for {user_email}")
return {
"user_email": user_email,
"authenticated": False,
"status": "error",
"error": str(e)
}
@app.get("/database_status")
async def database_status():
try:
healthy = check_database_health()
conn = sqlite3.connect("user_data.db")
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = [t[0] for t in c.fetchall()]
counts = {tbl: c.execute(f"SELECT COUNT(*) FROM {tbl}").fetchone()[0] for tbl in tables}
conn.close()
return {
"healthy": healthy,
"tables": tables,
"record_counts": counts,
"in_memory_users": list(user_credentials.keys()),
}
except Exception as e:
return {"error": str(e), "healthy": False}
@app.post("/fix_database")
async def fix_database():
try:
reset_database()
return {"status": "success"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/process_input")
async def process_input(req: BookingRequest):
try:
state = load_conversation_state(req.conversation_id)
user_email = req.user_email or state.get("user_email")
# Preserve the entire conversation context
state.update(
{
"user_input": req.user_input,
"conversation_id": req.conversation_id,
"user_email": user_email,
"intent": state.get("intent", ""),
"date": state.get("date"),
"time_range": state.get("time_range"),
"specific_time": state.get("specific_time"),
"available_slots": state.get("available_slots", []),
"confirmed_slot": state.get("confirmed_slot", ""),
"messages": state.get("messages", [])
+ [{"role": "user", "content": req.user_input}],
"last_intent": state.get("last_intent", ""),
"needs_more_info": state.get("needs_more_info", False),
"auth_required": state.get("auth_required", False),
"auth_url": state.get("auth_url", ""),
}
)
final_state = graph.invoke(state)
save_conversation_state(req.conversation_id, final_state)
assistant_msgs = [m for m in final_state.get("messages", []) if m["role"] == "assistant"]
reply = assistant_msgs[-1]["content"] if assistant_msgs else "I'm here to help!"
return {"response": reply, "state": final_state}
except Exception as e:
logger.exception("process_input failed")
return {"response": f"Error: {e}", "state": {}}
@app.post("/complete_auth")
async def complete_auth(user_email: str, auth_code: str):
return {
"status": "success" if complete_oauth_flow(user_email, auth_code) else "error"
}
@app.get("/callback")
async def oauth_callback(code: str):
# Identify pending user
user_email = next((e for e, d in user_credentials.items() if d.get("status") == "pending_auth"), None)
if not user_email:
return RedirectResponse("http://localhost:8501?error=no_pending_auth")
flow = user_credentials[user_email]["flow"]
flow.fetch_token(code=code)
store_user_credentials(user_email, flow.credentials)
user_credentials[user_email]["status"] = "completed"
return RedirectResponse("http://localhost:8501?auth=success")
@app.get("/check_auth")
async def check_auth(user_email: str, browser_session_id: str = ""):
"""Check authentication status for a user in a specific browser session - SECURE"""
try:
# SECURITY: Check if this user is associated with this browser session
session_user = get_session_user(browser_session_id) if browser_session_id else None
# If user is not associated with this browser session, require fresh auth
if browser_session_id and session_user != user_email:
# Generate fresh auth URL
auth_url = generate_auth_url(user_email)
if browser_session_id:
browser_sessions[browser_session_id] = {
"user_email": user_email,
"status": "pending_auth",
"auth_url": auth_url
}
return {
"authorized": False,
"status": "pending_auth",
"auth_url": auth_url
}
# Check if user has valid credentials
creds = load_user_credentials(user_email)
if creds:
try:
service = get_calendar_service(user_email)
# Store valid session mapping
if browser_session_id:
store_browser_session(browser_session_id, user_email)
return {"authorized": True, "status": "completed", "auth_url": ""}
except Exception:
# Credentials exist but are invalid
delete_user_credentials(user_email)
if browser_session_id:
delete_browser_session(browser_session_id, user_email)
# Generate new auth URL
auth_url = generate_auth_url(user_email)
if browser_session_id:
browser_sessions[browser_session_id] = {
"user_email": user_email,
"status": "pending_auth",
"auth_url": auth_url
}
return {
"authorized": False,
"status": "pending_auth",
"auth_url": auth_url
}
except Exception as e:
logger.exception(f"Error in check_auth for {user_email}")
return {
"authorized": False,
"status": "error",
"auth_url": "",
"error": str(e)
}
@app.post("/logout")
async def logout(request: dict):
"""Logout user from specific browser session"""
try:
user_email = request.get("user_email")
browser_session_id = request.get("browser_session_id")
if browser_session_id:
# Remove only this browser session
delete_browser_session(browser_session_id, user_email)
browser_sessions.pop(browser_session_id, None)
else:
# Fallback to original behavior - remove all user credentials
delete_user_credentials(user_email)
user_credentials.pop(user_email, None)
return {"status": "success"}
except Exception as e:
logger.exception(f"Error in logout")
return {"status": "error", "message": str(e)}
def run_fastapi():
uvicorn.run(app, host="127.0.0.1", port=8080, log_level="info")
def _periodic_cleanup():
while True:
time.sleep(300) # 5 minutes
cleanup_expired_credentials()
cleanup_old_sessions() # Clean old browser sessions
threading.Thread(target=_periodic_cleanup, daemon=True).start()
import time
import uuid
import requests
import streamlit as st
import urllib.parse
import hashlib
import json
from datetime import datetime, timedelta
import streamlit.components.v1 as components
BACKEND = "http://127.0.0.1:8080"
# ---------------------------------------------------------------------------
# Cookie Management Functions
# ---------------------------------------------------------------------------
def set_browser_cookie(key: str, value: str, days: int = 30):
"""Set a cookie in the browser"""
cookie_script = f"""
<script>
function setCookie(name, value, days) {{
var expires = "";
if (days) {{
var date = new Date();
date.setTime(date.getTime() + (days*24*60*60*1000));
expires = "; expires=" + date.toUTCString();
}}
document.cookie = name + "=" + (value || "") + expires + "; path=/";
}}
setCookie("{key}", "{value}", {days});
</script>
"""
components.html(cookie_script, height=0)
def get_browser_cookie(key: str) -> str:
"""Get a cookie value from the browser"""
cookie_script = f"""
<script>
function getCookie(name) {{
var nameEQ = name + "=";
var ca = document.cookie.split(';');
for(var i=0;i < ca.length;i++) {{
var c = ca[i];
while (c.charAt(0)==' ') c = c.substring(1,c.length);
if (c.indexOf(nameEQ) == 0) return c.substring(nameEQ.length,c.length);
}}
return null;
}}
var cookieValue = getCookie("{key}");
if (cookieValue) {{
window.parent.postMessage({{
type: 'cookie_value',
key: '{key}',
value: cookieValue
}}, '*');
}}
</script>
"""
components.html(cookie_script, height=0)
def delete_browser_cookie(key: str):
"""Delete a cookie from the browser"""
cookie_script = f"""
<script>
function deleteCookie(name) {{
document.cookie = name + '=; expires=Thu, 01 Jan 1970 00:00:01 GMT; path=/';
}}
deleteCookie("{key}");
</script>
"""
components.html(cookie_script, height=0)
def generate_browser_session_id():
"""Generate a unique session ID for this browser"""
import random
import string
return ''.join(random.choices(string.ascii_letters + string.digits, k=32))
# ---------------------------------------------------------------------------
# Simplified Authentication Functions
# ---------------------------------------------------------------------------
def get_authenticated_users_for_browser():
"""Get list of users authenticated in this specific browser session ONLY"""
if 'browser_session_id' not in st.session_state:
return []
try:
response = requests.get(
f"{BACKEND}/authenticated_users",
params={"browser_session_id": st.session_state.browser_session_id},
timeout=5
)
if response.status_code == 200:
data = response.json()
# SECURITY: Only return users for THIS browser session
return data.get("browser_authenticated_users", [])
except Exception as e:
st.error(f"Error checking authenticated users: {e}")
return []
def validate_user_with_backend(user_email: str) -> dict:
"""Validate user authentication with backend and get auth status"""
try:
params = {
"user_email": user_email,
"browser_session_id": st.session_state.get('browser_session_id', '')
}
response = requests.get(
f"{BACKEND}/check_auth",
params=params,
timeout=5
)
if response.status_code == 200:
result = response.json()
return {
'authorized': result.get('authorized', False),
'status': result.get('status', 'unknown'),
'auth_url': result.get('auth_url', '')
}
except Exception as e:
st.error(f"Error validating user: {e}")
return {'authorized': False, 'status': 'error', 'auth_url': ''}
def initiate_auth_for_user(user_email: str) -> dict:
"""Initiate authentication flow for a user"""
try:
data = {
"user_email": user_email,
"browser_session_id": st.session_state.get('browser_session_id', '')
}
response = requests.post(
f"{BACKEND}/initiate_auth",
json=data,
timeout=5
)
if response.status_code == 200:
result = response.json()
return {
'success': True,
'auth_url': result.get('auth_url', ''),
'status': result.get('status', 'pending_auth')
}
except Exception as e:
st.error(f"Error initiating auth: {e}")
return {'success': False, 'auth_url': '', 'status': 'error'}
def initialize_browser_session():
"""Initialize browser session with cookie management"""
# Generate or retrieve browser session ID
if 'browser_session_id' not in st.session_state:
# Try to get from cookie first
if 'cookie_checked' not in st.session_state:
st.session_state.cookie_checked = True
# This will trigger the cookie retrieval
get_browser_cookie('appointment_session_id')
# Wait a bit for cookie retrieval (in real implementation, you'd handle this differently)
time.sleep(0.1)
# If no cookie found, generate new session ID
if 'browser_session_id' not in st.session_state:
st.session_state.browser_session_id = generate_browser_session_id()
# Set cookie for this browser
set_browser_cookie('appointment_session_id', st.session_state.browser_session_id)
# Check if user was previously authenticated in this browser
if st.session_state.get('browser_session_id') and not st.session_state.get('user_email'):
try:
response = requests.get(
f"{BACKEND}/get_session_user",
params={"browser_session_id": st.session_state.browser_session_id},
timeout=5
)
if response.status_code == 200:
result = response.json()
if result.get('user_email'):
st.session_state.user_email = result['user_email']
st.session_state.auth_required = not result.get('authorized', False)
except Exception:
pass
def get_authenticated_users():
"""Get list of users who have valid authentication from backend"""
try:
response = requests.get(f"{BACKEND}/authenticated_users", timeout=5)
if response.status_code == 200:
data = response.json()
return data.get("authenticated_users", [])
except Exception as e:
st.error(f"Error checking authenticated users: {e}")
return []
def check_user_authentication(email):
"""Check if a specific user has valid authentication"""
try:
response = requests.get(f"{BACKEND}/user_status/{email}", timeout=5)
if response.status_code == 200:
result = response.json()
return result.get('authenticated', False)
except Exception:
pass
return False
def handle_streamlit_auth() -> bool:
"""Handle the authentication flow"""
if st.session_state.get("auth_required") and st.session_state.get("auth_url"):
st.warning("πŸ” Google Calendar authorization required")
col1, col2 = st.columns([3, 1])
with col1:
st.markdown(f"""
**Step 1:** [Click here to authorize Google Calendar access]({st.session_state.auth_url})
**Step 2:** Complete the authorization in the new tab
**Step 3:** Return to this page and click "Check Authorization"
""")
with col2:
if st.button("πŸ”„ Check Authorization", key="manual_check"):
try:
auth_result = validate_user_with_backend(st.session_state.user_email)
if auth_result['authorized']:
st.success("βœ… Authorization completed!")
st.session_state.auth_required = False
st.session_state.auth_url = ""
st.rerun()
else:
st.info(f"Status: {auth_result.get('status', 'pending')}")
st.info("Please complete the authorization in the other tab first.")
except Exception as e:
st.error(f"Error checking auth status: {e}")
return True
return False
def render_login_interface():
"""Render the login interface in sidebar - SECURE VERSION"""
st.subheader("πŸ” Account Login")
# Get authenticated users for THIS specific browser only
authenticated_users = get_authenticated_users_for_browser()
if authenticated_users:
# Show quick login for users authenticated in THIS browser
st.markdown("**Your authenticated accounts:**")
for i, user in enumerate(authenticated_users):
if st.button(f"πŸ“§ {user}", key=f"quick_login_{i}", use_container_width=True):
st.session_state.user_email = user
# Verify authentication for this specific browser session
auth_result = validate_user_with_backend(user)
if auth_result['authorized']:
st.session_state.auth_required = False
st.success(f"Logged in as {user}")
st.rerun()
else:
st.session_state.auth_required = True
st.session_state.auth_url = auth_result.get('auth_url', '')
if not st.session_state.auth_url:
auth_init = initiate_auth_for_user(user)
st.session_state.auth_url = auth_init.get('auth_url', '')
st.warning("Re-authentication required")
st.rerun()
st.markdown("---")
# Always show option to add new account
st.markdown("**Add new account:**")
new_email = st.text_input(
"Email address:",
placeholder="your.email@gmail.com",
key="new_email_input",
label_visibility="collapsed"
)
if st.button("Continue with Email", disabled=not new_email, use_container_width=True):
if new_email and "@" in new_email:
st.session_state.user_email = new_email
# Always require fresh authentication for new emails
st.session_state.auth_required = True
auth_init = initiate_auth_for_user(new_email)
st.session_state.auth_url = auth_init.get('auth_url', '')
st.info("Authentication required for this account")
st.rerun()
else:
st.error("Please enter a valid email address")
# ---------------------------------------------------------------------------
# Main Streamlit Application
# ---------------------------------------------------------------------------
def main():
st.set_page_config(page_title="AI Appointment Assistant", page_icon="πŸ“…")
# Initialize browser session first
initialize_browser_session()
# Initialize session state
if "conversation_id" not in st.session_state:
st.session_state.conversation_id = str(uuid.uuid4())
if "messages" not in st.session_state:
st.session_state.messages = []
if "user_email" not in st.session_state:
st.session_state.user_email = None
if "auth_required" not in st.session_state:
st.session_state.auth_required = False
if "auth_url" not in st.session_state:
st.session_state.auth_url = ""
# Handle URL parameters for auth success/error
try:
if hasattr(st, "query_params"):
qp = st.query_params
if "auth" in qp and qp["auth"] == "success":
st.success("βœ… Authorization successful! You can now book appointments.")
st.session_state.auth_required = False
st.session_state.auth_url = ""
qp.clear()
st.rerun()
elif "error" in qp:
st.error(f"❌ Authorization failed: {qp['error']}")
st.session_state.auth_required = False
st.session_state.auth_url = ""
qp.clear()
except Exception:
pass
st.title("πŸ“… AI Appointment Assistant")
# Sidebar for user authentication
with st.sidebar:
if st.session_state.user_email:
# User is logged in - show status and logout
st.success(f"**Logged in as:**\n{st.session_state.user_email}")
st.info(f"**Browser Session:** {st.session_state.browser_session_id[:8]}...")
if st.session_state.auth_required:
st.warning("⚠️ Authorization required")
else:
st.success("βœ… Ready to help!")
# Account actions
st.markdown("---")
col1, col2 = st.columns(2)
with col1:
if st.button("πŸ”„ Refresh", key="refresh_auth", use_container_width=True):
auth_result = validate_user_with_backend(st.session_state.user_email)
if auth_result['authorized']:
st.session_state.auth_required = False
st.session_state.auth_url = ""
st.success("βœ… Refreshed!")
else:
st.session_state.auth_required = True
st.session_state.auth_url = auth_result.get('auth_url', '')
st.warning("Re-auth needed")
st.rerun()
with col2:
if st.button("πŸ”’ Logout", key="logout_btn", use_container_width=True):
try:
# Logout from this browser session
response = requests.post(
f"{BACKEND}/logout",
json={
"user_email": st.session_state.user_email,
"browser_session_id": st.session_state.browser_session_id
},
timeout=5,
)
except Exception as e:
st.error(f"Logout error: {e}")
# Clear cookies
delete_browser_cookie('appointment_session_id')
# Clear session state
st.session_state.user_email = None
st.session_state.auth_required = False
st.session_state.auth_url = ""
st.session_state.messages = []
st.session_state.conversation_id = str(uuid.uuid4())
# Generate new browser session ID
st.session_state.browser_session_id = generate_browser_session_id()
set_browser_cookie('appointment_session_id', st.session_state.browser_session_id)
st.success("πŸ‘‹ Logged out!")
st.rerun()
else:
# User not logged in - show login interface
render_login_interface()
# Handle authentication flow
if handle_streamlit_auth():
return
# Show appropriate content based on login status
if not st.session_state.user_email:
# Welcome screen for new users
st.markdown("""
### Welcome! πŸ‘‹
I'm your AI appointment assistant. I can help you:
- πŸ“… **Book appointments** - Just say "Book a meeting tomorrow at 3 PM"
- πŸ” **Check availability** - Ask "Am I free Friday afternoon?"
- πŸ“‹ **View your schedule** - Say "Show my appointments"
**To get started, please log in using the sidebar** πŸ‘ˆ
""")
# Add some example interactions
with st.expander("πŸ’‘ See example conversations"):
st.markdown("""
**Booking an appointment:**
- "Book a meeting tomorrow at 2 PM"
- "Schedule a call with John next Friday from 10-11 AM"
**Checking availability:**
- "Am I free this Thursday afternoon?"
- "What's my availability next week?"
**Viewing schedule:**
- "Show my appointments"
- "What meetings do I have today?"
""")
return
# Main chat interface - only show if user is selected
if st.session_state.user_email and not st.session_state.auth_required:
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if user_input := st.chat_input("Ask me to book an appointment or check availability..."):
# Add user message to chat
st.session_state.messages.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
# Process user input
try:
with st.spinner("Thinking..."):
response = requests.post(
f"{BACKEND}/process_input",
json={
"user_input": user_input,
"conversation_id": st.session_state.conversation_id,
"user_email": st.session_state.user_email,
},
timeout=30,
)
if response.status_code == 200:
result = response.json()
backend_state = result.get("state", {})
# Update auth state from backend
st.session_state.auth_required = backend_state.get("auth_required", False)
st.session_state.auth_url = backend_state.get("auth_url", "")
if not st.session_state.auth_required:
# Show assistant response
with st.chat_message("assistant"):
st.markdown(result["response"])
st.session_state.messages.append({
"role": "assistant",
"content": result["response"]
})
else:
# Need to re-authenticate
st.rerun()
else:
error_msg = "Sorry, I'm having trouble processing your request. Please try again."
with st.chat_message("assistant"):
st.markdown(error_msg)
st.session_state.messages.append({
"role": "assistant",
"content": error_msg
})
except requests.exceptions.RequestException:
error_msg = "Connection error. Please make sure the server is running and try again."
with st.chat_message("assistant"):
st.markdown(error_msg)
st.session_state.messages.append({
"role": "assistant",
"content": error_msg
})
except Exception as e:
error_msg = f"An unexpected error occurred: {str(e)}"
with st.chat_message("assistant"):
st.markdown(error_msg)
st.session_state.messages.append({
"role": "assistant",
"content": error_msg
})
# Show welcome message for authenticated users with empty chat
elif st.session_state.user_email and not st.session_state.auth_required and not st.session_state.messages:
st.success(f"βœ… Ready to help! Authenticated as: {st.session_state.user_email}")
st.markdown("""
### What would you like to do?
You can ask me to:
- Book a new appointment
- Check your availability
- Show your upcoming meetings
Just type your request below! πŸ‘‡
""")
# Debug info in sidebar (optional - remove in production)
with st.sidebar:
if st.checkbox("πŸ”§ Debug Info", key="debug_toggle"):
st.markdown("---")
st.subheader("Debug Information")
st.json({
"user_email": st.session_state.user_email,
"auth_required": st.session_state.auth_required,
"has_auth_url": bool(st.session_state.auth_url),
"conversation_id": st.session_state.conversation_id,
"message_count": len(st.session_state.messages)
})
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment