Skip to content

Instantly share code, notes, and snippets.

@shricodev
Created August 4, 2025 08:00
Show Gist options
  • Select an option

  • Save shricodev/cf1d11f500ab270937582ff01eb63135 to your computer and use it in GitHub Desktop.

Select an option

Save shricodev/cf1d11f500ab270937582ff01eb63135 to your computer and use it in GitHub Desktop.
CLI MCP Chat Client with Composio (Developed by Claude Sonnet 4 AI Model) - Blog Demo
#!/usr/bin/env python3
"""
Enhanced Chat Client
Features:
- Robust connection handling
- Rich UI with colors and formatting
- Automatic reconnection
- Message history
- Command support
- Connection status monitoring
"""
import json
import os
import signal
import socket
import sys
import threading
import time
from datetime import datetime
from typing import List, Optional
# Color codes for terminal output
class Colors:
RESET = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
# Text colors
BLACK = "\033[30m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
# Bright colors
BRIGHT_RED = "\033[91m"
BRIGHT_GREEN = "\033[92m"
BRIGHT_YELLOW = "\033[93m"
BRIGHT_BLUE = "\033[94m"
BRIGHT_MAGENTA = "\033[95m"
BRIGHT_CYAN = "\033[96m"
BRIGHT_WHITE = "\033[97m"
class ChatClient:
def __init__(self):
self.client_socket = None
self.connected = False
self.username = ""
self.server_ip = ""
self.server_port = 12345
self.running = False
self.message_history: List[str] = []
self.client_count = 0
self.last_ping = time.time()
# Setup signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
print(f"\n{Colors.YELLOW}πŸ‘‹ Goodbye!{Colors.RESET}")
self.disconnect()
sys.exit(0)
def clear_screen(self):
"""Clear terminal screen"""
os.system("cls" if os.name == "nt" else "clear")
def print_header(self):
"""Print application header"""
self.clear_screen()
print(f"{Colors.BRIGHT_CYAN}{Colors.BOLD}")
print("=" * 60)
print("πŸ—£οΈ ENHANCED CHAT CLIENT")
print("=" * 60)
print(f"{Colors.RESET}")
def print_status(self, message, color=Colors.WHITE):
"""Print status message with color"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"{Colors.DIM}[{timestamp}]{Colors.RESET} {color}{message}{Colors.RESET}")
def get_connection_details(self):
"""Get server connection details from user"""
self.print_header()
print(f"{Colors.BRIGHT_YELLOW}πŸ“‘ Server Connection Setup{Colors.RESET}")
print("-" * 30)
while True:
try:
# Get server IP
server_input = input(
f"{Colors.CYAN}Enter server IP (default: 127.0.0.1): {Colors.RESET}"
).strip()
self.server_ip = server_input if server_input else "127.0.0.1"
# Validate IP format (basic validation)
if self.server_ip != "127.0.0.1" and self.server_ip != "localhost":
ip_parts = self.server_ip.split(".")
if len(ip_parts) != 4 or not all(
part.isdigit() and 0 <= int(part) <= 255 for part in ip_parts
):
print(f"{Colors.RED}❌ Invalid IP address format{Colors.RESET}")
continue
# Get server port
port_input = input(
f"{Colors.CYAN}Enter server port (default: 12345): {Colors.RESET}"
).strip()
if port_input:
self.server_port = int(port_input)
if not (1 <= self.server_port <= 65535):
print(
f"{Colors.RED}❌ Port must be between 1 and 65535{Colors.RESET}"
)
continue
break
except ValueError:
print(f"{Colors.RED}❌ Invalid port number{Colors.RESET}")
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}πŸ‘‹ Goodbye!{Colors.RESET}")
sys.exit(0)
def get_username(self):
"""Get username from user"""
while True:
try:
username = input(
f"{Colors.CYAN}Enter your username (1-20 characters): {Colors.RESET}"
).strip()
if not username:
print(f"{Colors.RED}❌ Username cannot be empty{Colors.RESET}")
continue
if len(username) > 20:
print(
f"{Colors.RED}❌ Username too long (max 20 characters){Colors.RESET}"
)
continue
if not username.replace("_", "").replace("-", "").isalnum():
print(
f"{Colors.RED}❌ Username can only contain letters, numbers, hyphens, and underscores{Colors.RESET}"
)
continue
self.username = username
break
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}πŸ‘‹ Goodbye!{Colors.RESET}")
sys.exit(0)
def get_user_email(self):
"""Get user email for MCP tool access (optional)"""
try:
print(f"\n{Colors.BRIGHT_MAGENTA}πŸ”§ MCP Tool Access Setup{Colors.RESET}")
print(
f"{Colors.DIM}Enter your email to enable AI tool access (Gmail, Calendar, etc.){Colors.RESET}"
)
print(f"{Colors.DIM}Or press Enter to skip tool features{Colors.RESET}")
email = input(f"{Colors.CYAN}Email (optional): {Colors.RESET}").strip()
if email and "@" in email:
return email
elif email:
print(
f"{Colors.YELLOW}⚠️ Invalid email format, skipping tool access{Colors.RESET}"
)
time.sleep(1)
return None
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}πŸ‘‹ Goodbye!{Colors.RESET}")
sys.exit(0)
def connect_to_server(self, max_retries=3):
"""Connect to the chat server with retry logic"""
for attempt in range(max_retries):
try:
self.print_status(
f"πŸ”„ Connecting to {self.server_ip}:{self.server_port} (attempt {attempt + 1}/{max_retries})",
Colors.YELLOW,
)
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.settimeout(10) # 10 second timeout
# Connect to server
self.client_socket.connect((self.server_ip, self.server_port))
self.connected = True
self.print_status("βœ… Connected to server successfully!", Colors.GREEN)
# Send username
username_message = json.dumps(
{"type": "auth", "message": self.username}
)
self.client_socket.send(username_message.encode("utf-8"))
# Send email if provided
if hasattr(self, "user_email") and self.user_email:
# Wait for email prompt from server
time.sleep(0.1)
email_message = json.dumps(
{"type": "auth", "message": self.user_email}
)
self.client_socket.send(email_message.encode("utf-8"))
return True
except socket.timeout:
self.print_status("⏰ Connection timeout", Colors.RED)
except ConnectionRefusedError:
self.print_status(
"❌ Connection refused - server may be down", Colors.RED
)
except Exception as e:
self.print_status(f"❌ Connection error: {e}", Colors.RED)
if attempt < max_retries - 1:
self.print_status("⏳ Retrying in 2 seconds...", Colors.YELLOW)
time.sleep(2)
return False
def receive_messages(self):
"""Receive messages from server in a separate thread"""
buffer = ""
while self.running and self.connected:
try:
data = self.client_socket.recv(1024).decode("utf-8")
if not data:
break
buffer += data
# Process complete messages (separated by newlines)
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
if line.strip():
self.process_received_message(line.strip())
self.last_ping = time.time()
except socket.timeout:
# Check if connection is still alive
if time.time() - self.last_ping > 60: # 1 minute timeout
self.print_status("⚠️ Connection timeout", Colors.RED)
break
except socket.error:
break
except Exception as e:
self.print_status(f"❌ Receive error: {e}", Colors.RED)
break
if self.running:
self.print_status("❌ Connection lost", Colors.RED)
self.connected = False
def process_received_message(self, message_str):
"""Process received message from server"""
try:
message_data = json.loads(message_str)
msg_type = message_data.get("type", "unknown")
content = message_data.get("message", "")
if msg_type == "chat":
# Regular chat message
username = message_data.get("username", "Unknown")
timestamp = message_data.get("timestamp", "")
if timestamp:
time_obj = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
time_str = time_obj.strftime("%H:%M:%S")
else:
time_str = datetime.now().strftime("%H:%M:%S")
# Color code based on username
if username == self.username:
username_color = Colors.BRIGHT_GREEN
else:
# Simple hash-based color assignment
colors = [
Colors.BLUE,
Colors.MAGENTA,
Colors.CYAN,
Colors.BRIGHT_BLUE,
Colors.BRIGHT_MAGENTA,
]
username_color = colors[hash(username) % len(colors)]
formatted_message = f"{Colors.DIM}[{time_str}]{Colors.RESET} {username_color}{username}:{Colors.RESET} {content}"
print(formatted_message)
self.message_history.append(formatted_message)
elif msg_type == "tool_response":
# AI/Tool response message - special formatting
username = message_data.get("username", "Unknown")
timestamp = message_data.get("timestamp", "")
if timestamp:
time_obj = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
time_str = time_obj.strftime("%H:%M:%S")
else:
time_str = datetime.now().strftime("%H:%M:%S")
# Special formatting for tool responses
print(f"\n{Colors.BRIGHT_CYAN}{'=' * 60}{Colors.RESET}")
print(
f"{Colors.DIM}[{time_str}]{Colors.RESET} {Colors.BRIGHT_CYAN}πŸ€– AI TOOL RESPONSE{Colors.RESET}"
)
print(f"{Colors.BRIGHT_CYAN}{'=' * 60}{Colors.RESET}")
# Split content by lines and format nicely
lines = content.split("\n")
for line in lines:
if line.strip():
if line.startswith("πŸ€–"):
print(f"{Colors.BRIGHT_WHITE}{line}{Colors.RESET}")
elif line.startswith("πŸ”§"):
print(f"{Colors.BRIGHT_GREEN}{line}{Colors.RESET}")
elif line.startswith("πŸ“Š"):
print(f"{Colors.BRIGHT_YELLOW}{line}{Colors.RESET}")
else:
print(f"{Colors.WHITE}{line}{Colors.RESET}")
print(f"{Colors.BRIGHT_CYAN}{'=' * 60}{Colors.RESET}\n")
elif msg_type == "system":
# System message
self.print_status(f"πŸ”” {content}", Colors.BRIGHT_YELLOW)
elif msg_type == "notification":
# Notification message
timestamp = message_data.get("timestamp", "")
if timestamp:
time_obj = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
time_str = time_obj.strftime("%H:%M:%S")
print(
f"{Colors.DIM}[{time_str}]{Colors.RESET} {Colors.BRIGHT_BLUE}ℹ️ {content}{Colors.RESET}"
)
else:
self.print_status(f"ℹ️ {content}", Colors.BRIGHT_BLUE)
elif msg_type == "error":
# Error message
self.print_status(f"❌ {content}", Colors.BRIGHT_RED)
elif msg_type == "client_count":
# Update client count
self.client_count = message_data.get("count", 0)
except json.JSONDecodeError:
# Handle plain text messages
print(f"{Colors.WHITE}{message_str}{Colors.RESET}")
except Exception as e:
self.print_status(f"Error processing message: {e}", Colors.RED)
def send_message(self, message):
"""Send message to server"""
if not self.connected:
self.print_status("❌ Not connected to server", Colors.RED)
return False
try:
if message.startswith("/"):
# Command message
message_data = {"type": "command", "message": message}
else:
# Regular chat message
message_data = {"type": "chat", "message": message}
message_json = json.dumps(message_data)
self.client_socket.send(message_json.encode("utf-8"))
return True
except Exception as e:
self.print_status(f"❌ Send error: {e}", Colors.RED)
return False
def show_help(self):
"""Show client help information"""
help_text = f"""
{Colors.BRIGHT_CYAN}πŸ“‹ MCP Chat Client Help{Colors.RESET}
{"-" * 30}
{Colors.YELLOW}Chat Commands:{Colors.RESET}
/help - Show server commands
/users - List online users
/time - Show server time
/ping - Test connection
/quit - Exit the chat
/cls - Clear screen
/history - Show message history
{Colors.BRIGHT_MAGENTA}πŸ”§ MCP Tool Commands:{Colors.RESET}
/setup-tools [toolkit] - Setup tool access (gmail, calendar, etc.)
/authorize-tools - Complete tool authorization after visiting URL
{Colors.YELLOW}Natural Language Tool Usage:{Colors.RESET}
β€’ "Send an email to john@example.com saying hello"
β€’ "Schedule a meeting for tomorrow at 2pm"
β€’ "Upload a file to my Google Drive"
β€’ "Create a new GitHub repository"
{Colors.YELLOW}Special Features:{Colors.RESET}
β€’ Messages are color-coded by user
β€’ AI tool responses are highlighted
β€’ Connection status monitoring
β€’ Message timestamps
β€’ Client count display
{Colors.YELLOW}Tips:{Colors.RESET}
β€’ Press Ctrl+C to exit gracefully
β€’ Use /cls to clear the screen
β€’ Tool commands start with '/'
β€’ Natural language works for AI tools
"""
print(help_text)
def start_chat(self):
"""Start the main chat loop"""
self.running = True
# Start message receiving thread
receive_thread = threading.Thread(target=self.receive_messages, daemon=True)
receive_thread.start()
# Wait a moment for initial server messages
time.sleep(0.5)
# Show welcome message
print(
f"\n{Colors.BRIGHT_GREEN}πŸŽ‰ Welcome to MCP Chat, {self.username}!{Colors.RESET}"
)
print(f"{Colors.DIM}Type '/help' for commands, '/quit' to exit{Colors.RESET}")
if hasattr(self, "user_email") and self.user_email:
print(
f"{Colors.BRIGHT_MAGENTA}πŸ”§ Tool access enabled for {self.user_email}{Colors.RESET}"
)
print(
f"{Colors.DIM}πŸ’‘ Use /setup-tools to authorize AI tool access{Colors.RESET}"
)
print(
f"{Colors.DIM}πŸ’¬ Just chat naturally - AI will use tools when appropriate!{Colors.RESET}"
)
if self.client_count > 1:
print(f"{Colors.DIM}πŸ‘₯ {self.client_count} users online{Colors.RESET}")
print("-" * 60)
# Main chat loop
while self.running and self.connected:
try:
message = input().strip()
if not message:
continue
# Handle local commands
if message.lower() in ["/quit", "/exit", "/q"]:
break
elif message.lower() in ["/cls", "/clear"]:
self.clear_screen()
continue
elif message.lower() == "/help-client":
self.show_help()
continue
elif message.lower() == "/history":
print(f"\n{Colors.BRIGHT_CYAN}πŸ“œ Message History:{Colors.RESET}")
for msg in self.message_history[-10:]: # Show last 10 messages
print(msg)
print("-" * 30)
continue
# Send message to server
if not self.send_message(message):
self.print_status("❌ Failed to send message", Colors.RED)
break
except KeyboardInterrupt:
break
except EOFError:
break
except Exception as e:
self.print_status(f"❌ Input error: {e}", Colors.RED)
self.disconnect()
def disconnect(self):
"""Disconnect from server"""
self.running = False
self.connected = False
if self.client_socket:
try:
self.client_socket.close()
except:
pass
self.print_status("πŸ‘‹ Disconnected from server", Colors.YELLOW)
def run(self):
"""Main client application"""
try:
# Setup connection
self.get_connection_details()
self.get_username()
# Get optional email for MCP tools
self.user_email = self.get_user_email()
# Connect to server
if self.connect_to_server():
self.start_chat()
else:
self.print_status("❌ Failed to connect to server", Colors.RED)
except KeyboardInterrupt:
print(f"\n{Colors.YELLOW}πŸ‘‹ Goodbye!{Colors.RESET}")
except Exception as e:
self.print_status(f"❌ Application error: {e}", Colors.RED)
finally:
self.disconnect()
def main():
"""Main client entry point"""
client = ChatClient()
client.run()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""
MCP-Enhanced Chat Server with Composio Integration
Simple and clean integration following the provided pattern
"""
import json
import logging
import os
import signal
import socket
import sys
import threading
import time
from datetime import datetime
from typing import Dict
# External dependencies for MCP integration
try:
from composio import Composio
from openai import OpenAI
COMPOSIO_AVAILABLE = True
except ImportError:
print(
"⚠️ Composio and OpenAI not installed. Install with: pip install composio openai"
)
COMPOSIO_AVAILABLE = False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("mcp_chat_server.log"), logging.StreamHandler()],
)
class ComposioIntegration:
"""Simple Composio integration following the provided pattern"""
def __init__(self):
if not COMPOSIO_AVAILABLE:
self.enabled = False
logging.warning("Composio not available - tool features disabled")
return
self.enabled = True
self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.composio = Composio(api_key=os.getenv("COMPOSIO_API_KEY"))
self.user_tools = {} # user_id -> tools
self.user_connections = {} # user_id -> connection status
def setup_user_tools(self, user_id: str, toolkits=["GMAIL"]):
"""Setup tools for a user following the exact pattern"""
try:
# Initialize connection request
connection_request = self.composio.toolkits.authorize(
user_id=user_id, toolkit=toolkits[0]
)
# Store connection request for the user
self.user_connections[user_id] = {
"connection_request": connection_request,
"toolkits": toolkits,
"authorized": False,
}
return {
"success": True,
"redirect_url": connection_request.redirect_url,
"toolkits": toolkits,
}
except Exception as e:
logging.error(f"Error setting up tools for {user_id}: {e}")
return {"success": False, "error": str(e)}
def complete_authorization(self, user_id: str):
"""Complete the authorization process"""
try:
if user_id not in self.user_connections:
return {"success": False, "error": "No authorization request found"}
connection_info = self.user_connections[user_id]
connection_request = connection_info["connection_request"]
toolkits = connection_info["toolkits"]
# Wait for the connection to be active (following the pattern)
connection_request.wait_for_connection()
# Fetch tools (following the pattern)
tools = self.composio.tools.get(user_id=user_id, toolkits=toolkits)
# Store tools for the user
self.user_tools[user_id] = tools
self.user_connections[user_id]["authorized"] = True
return {"success": True, "toolkits": toolkits, "tool_count": len(tools)}
except Exception as e:
logging.error(f"Error completing authorization for {user_id}: {e}")
return {"success": False, "error": str(e)}
def execute_with_tools(self, user_id: str, message: str):
"""Execute user message with tools following the exact pattern"""
try:
if user_id not in self.user_tools:
return {
"success": False,
"error": "User not authorized. Use /setup-tools first.",
}
tools = self.user_tools[user_id]
# Invoke agent (following the exact pattern)
completion = self.openai.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": message,
},
],
tools=tools,
)
# Handle Result from tool call (following the exact pattern)
result = self.composio.provider.handle_tool_calls(
user_id=user_id, response=completion
)
return {
"success": True,
"result": result,
"ai_response": completion.choices[0].message.content,
"tool_calls_made": bool(completion.choices[0].message.tool_calls),
}
except Exception as e:
logging.error(f"Error executing tools for {user_id}: {e}")
return {"success": False, "error": str(e)}
class MCPChatServer:
def __init__(self, host="127.0.0.1", port=12345):
self.host = host
self.port = port
self.server_socket = None
self.clients: Dict[socket.socket, str] = {} # socket -> username
self.client_addresses: Dict[socket.socket, tuple] = {} # socket -> address
self.client_user_ids: Dict[socket.socket, str] = {} # socket -> user_id (email)
self.running = False
self.lock = threading.Lock()
# Initialize Composio integration
self.composio_integration = ComposioIntegration()
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signum, frame):
"""Handle shutdown signals gracefully"""
print("\nπŸ›‘ Server shutdown initiated...")
self.shutdown()
sys.exit(0)
def start_server(self):
"""Initialize and start the server"""
try:
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(10)
self.running = True
print("=" * 70)
print("πŸš€ MCP-ENHANCED CHAT SERVER")
print("=" * 70)
print(f"πŸ“‘ Server: {self.host}:{self.port}")
print(
f"πŸ”§ Composio: {'βœ… Ready' if self.composio_integration.enabled else '❌ Disabled'}"
)
print(f"⏰ Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 70)
logging.info(f"MCP Server started on {self.host}:{self.port}")
# Start server monitoring thread
monitor_thread = threading.Thread(target=self.server_monitor, daemon=True)
monitor_thread.start()
# Accept connections
self.accept_connections()
except Exception as e:
logging.error(f"Failed to start server: {e}")
self.shutdown()
def accept_connections(self):
"""Accept incoming client connections"""
while self.running:
try:
client_socket, address = self.server_socket.accept()
# Handle new client in separate thread
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address),
daemon=True,
)
client_thread.start()
except socket.error as e:
if self.running:
logging.error(f"Error accepting connection: {e}")
break
def handle_client(self, client_socket, address):
"""Handle individual client connection"""
username = None
user_id = None
try:
# Set socket timeout
client_socket.settimeout(300) # 5 minutes timeout
# Request username
self.send_message(
client_socket,
{"type": "system", "message": "Please enter your username:"},
)
# Receive username
username_data = client_socket.recv(1024).decode("utf-8").strip()
if not username_data:
raise ConnectionError("No username provided")
try:
username_msg = json.loads(username_data)
username = username_msg.get("message", "").strip()
except json.JSONDecodeError:
username = username_data.strip()
if not username or len(username) > 20:
self.send_message(
client_socket,
{
"type": "error",
"message": "Invalid username. Must be 1-20 characters.",
},
)
return
# Request user ID (email) for Composio
if self.composio_integration.enabled:
self.send_message(
client_socket,
{
"type": "system",
"message": "Please enter your email for tool access (or press Enter to skip):",
},
)
user_id_data = client_socket.recv(1024).decode("utf-8").strip()
try:
user_id_msg = json.loads(user_id_data)
user_id = user_id_msg.get("message", "").strip()
except json.JSONDecodeError:
user_id = user_id_data.strip()
# Check if username is already taken
with self.lock:
if username in self.clients.values():
self.send_message(
client_socket,
{
"type": "error",
"message": "Username already taken. Please choose another.",
},
)
return
# Add client to active clients
self.clients[client_socket] = username
self.client_addresses[client_socket] = address
if user_id:
self.client_user_ids[client_socket] = user_id
# Welcome the new client
welcome_msg = f"Welcome to the MCP Chat, {username}! πŸŽ‰"
if self.composio_integration.enabled and user_id:
welcome_msg += f"\nπŸ”§ Tool access enabled for {user_id}"
welcome_msg += "\nπŸ’‘ Use /setup-tools to authorize tool access"
elif self.composio_integration.enabled:
welcome_msg += "\nπŸ’‘ Reconnect with email to enable tool access"
self.send_message(client_socket, {"type": "system", "message": welcome_msg})
# Notify all clients about new user
self.broadcast_message(
{
"type": "notification",
"message": f"πŸ‘‹ {username} joined the chat",
"timestamp": datetime.now().isoformat(),
},
exclude=client_socket,
)
print(f"βœ… {username} ({address[0]}:{address[1]}) connected")
print(f"πŸ‘₯ Active clients: {len(self.clients)}")
logging.info(f"Client {username} connected from {address}")
# Send current client count
self.send_client_count()
# Handle client messages
while self.running:
try:
data = client_socket.recv(1024)
if not data:
break
message_str = data.decode("utf-8").strip()
if not message_str:
continue
try:
message_data = json.loads(message_str)
self.process_message(client_socket, username, message_data)
except json.JSONDecodeError:
# Handle plain text messages for backward compatibility
self.process_message(
client_socket,
username,
{"type": "chat", "message": message_str},
)
except socket.timeout:
self.send_message(
client_socket,
{
"type": "system",
"message": "⚠️ Connection timeout. Please send a message to stay connected.",
},
)
except socket.error:
break
except Exception as e:
logging.error(f"Error handling client {username or 'unknown'}: {e}")
finally:
self.disconnect_client(client_socket, username)
def process_message(self, client_socket, username, message_data):
"""Process different types of messages"""
msg_type = message_data.get("type", "chat")
message = message_data.get("message", "").strip()
if not message:
return
if msg_type == "chat":
# Check if this might be a tool request
user_id = self.client_user_ids.get(client_socket)
if (
self.composio_integration.enabled
and user_id
and user_id in self.composio_integration.user_tools
):
# Try to execute with tools
self.execute_tool_message(client_socket, username, user_id, message)
else:
# Regular chat message
chat_message = {
"type": "chat",
"username": username,
"message": message,
"timestamp": datetime.now().isoformat(),
}
self.broadcast_message(chat_message)
logging.info(f"Message from {username}: {message}")
elif msg_type == "command":
# Handle special commands
self.handle_command(client_socket, username, message)
def execute_tool_message(self, client_socket, username, user_id, message):
"""Execute message with tool support"""
try:
# Show that we're processing the request
self.broadcast_message(
{
"type": "notification",
"message": f"πŸ”§ {username} is using AI tools...",
"timestamp": datetime.now().isoformat(),
}
)
# Execute with tools
result = self.composio_integration.execute_with_tools(user_id, message)
if result["success"]:
# Show the result to everyone
response_msg = f"πŸ€– AI Response for {username}:\n"
if result["ai_response"]:
response_msg += f"{result['ai_response']}\n"
if result["tool_calls_made"]:
response_msg += f"πŸ”§ Tool Action Completed!\n"
if result["result"]:
response_msg += f"πŸ“Š Result: {str(result['result'])[:200]}..."
self.broadcast_message(
{
"type": "tool_response",
"username": username,
"message": response_msg,
"timestamp": datetime.now().isoformat(),
}
)
else:
# Show error
self.send_message(
client_socket,
{
"type": "error",
"message": f"Tool execution failed: {result['error']}",
},
)
except Exception as e:
logging.error(f"Tool execution error: {e}")
self.send_message(
client_socket,
{"type": "error", "message": f"Tool execution error: {str(e)}"},
)
def handle_command(self, client_socket, username, command):
"""Handle special chat commands"""
if command.startswith("/"):
cmd_parts = command[1:].split()
cmd = cmd_parts[0].lower()
if cmd == "setup-tools":
self.handle_setup_tools(client_socket, username, cmd_parts[1:])
elif cmd == "authorize-tools":
self.handle_authorize_tools(client_socket, username)
elif cmd == "help":
help_text = """
Available commands:
/help - Show this help message
/users - List all online users
/time - Show current server time
/ping - Test connection
/setup-tools [toolkit] - Setup tool access (e.g., /setup-tools gmail)
/authorize-tools - Complete tool authorization
"""
self.send_message(
client_socket, {"type": "system", "message": help_text}
)
elif cmd == "users":
with self.lock:
users = list(self.clients.values())
self.send_message(
client_socket,
{
"type": "system",
"message": f"Online users ({len(users)}): {', '.join(users)}",
},
)
elif cmd == "time":
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.send_message(
client_socket,
{"type": "system", "message": f"Server time: {current_time}"},
)
elif cmd == "ping":
self.send_message(
client_socket, {"type": "system", "message": "πŸ“ Pong!"}
)
else:
self.send_message(
client_socket,
{
"type": "error",
"message": f"Unknown command: /{cmd}. Type /help for available commands.",
},
)
def handle_setup_tools(self, client_socket, username, args):
"""Handle tool setup command"""
if not self.composio_integration.enabled:
self.send_message(
client_socket,
{"type": "error", "message": "Composio integration not available"},
)
return
user_id = self.client_user_ids.get(client_socket)
if not user_id:
self.send_message(
client_socket,
{
"type": "error",
"message": "Email required for tool access. Please reconnect with your email.",
},
)
return
# Default to Gmail if no toolkit specified
toolkit = args[0].upper() if args else "GMAIL"
# Setup tools
result = self.composio_integration.setup_user_tools(user_id, [toolkit])
if result["success"]:
self.send_message(
client_socket,
{
"type": "system",
"message": f"πŸ”— Visit this URL to authorize {toolkit}:\nπŸ‘‰ {result['redirect_url']}\n\nThen use /authorize-tools to complete setup",
},
)
# Notify others
self.broadcast_message(
{
"type": "notification",
"message": f"πŸ”§ {username} is setting up {toolkit} tools",
"timestamp": datetime.now().isoformat(),
},
exclude=client_socket,
)
else:
self.send_message(
client_socket,
{
"type": "error",
"message": f"Failed to setup tools: {result['error']}",
},
)
def handle_authorize_tools(self, client_socket, username):
"""Handle tool authorization completion"""
if not self.composio_integration.enabled:
self.send_message(
client_socket,
{"type": "error", "message": "Composio integration not available"},
)
return
user_id = self.client_user_ids.get(client_socket)
if not user_id:
self.send_message(
client_socket,
{"type": "error", "message": "Email required for tool access"},
)
return
# Complete authorization
result = self.composio_integration.complete_authorization(user_id)
if result["success"]:
self.send_message(
client_socket,
{
"type": "system",
"message": f"βœ… Tool authorization complete! You now have access to {result['tool_count']} tools.\nπŸ’‘ Just chat normally - AI will use tools when needed!",
},
)
# Notify others
self.broadcast_message(
{
"type": "notification",
"message": f"πŸŽ‰ {username} now has AI tool access!",
"timestamp": datetime.now().isoformat(),
},
exclude=client_socket,
)
else:
self.send_message(
client_socket,
{
"type": "error",
"message": f"Authorization failed: {result['error']}",
},
)
def send_message(self, client_socket, message_data):
"""Send message to a specific client"""
try:
message_json = json.dumps(message_data) + "\n"
client_socket.send(message_json.encode("utf-8"))
except Exception as e:
logging.error(f"Error sending message to client: {e}")
def broadcast_message(self, message_data, exclude=None):
"""Broadcast message to all connected clients"""
with self.lock:
disconnected_clients = []
for client_socket in list(self.clients.keys()):
if client_socket == exclude:
continue
try:
self.send_message(client_socket, message_data)
except Exception as e:
logging.error(f"Error broadcasting to client: {e}")
disconnected_clients.append(client_socket)
# Remove disconnected clients
for client_socket in disconnected_clients:
self.disconnect_client(client_socket, self.clients.get(client_socket))
def send_client_count(self):
"""Send current client count to all clients"""
with self.lock:
count = len(self.clients)
self.broadcast_message({"type": "client_count", "count": count})
def disconnect_client(self, client_socket, username):
"""Handle client disconnection"""
try:
with self.lock:
if client_socket in self.clients:
username = self.clients[client_socket]
del self.clients[client_socket]
if client_socket in self.client_addresses:
address = self.client_addresses[client_socket]
del self.client_addresses[client_socket]
print(
f"❌ {username or 'Unknown'} ({address[0]}:{address[1]}) disconnected"
)
if client_socket in self.client_user_ids:
del self.client_user_ids[client_socket]
print(f"πŸ‘₯ Active clients: {len(self.clients)}")
client_socket.close()
if username:
# Notify other clients
self.broadcast_message(
{
"type": "notification",
"message": f"πŸ‘‹ {username} left the chat",
"timestamp": datetime.now().isoformat(),
}
)
logging.info(f"Client {username} disconnected")
# Update client count
self.send_client_count()
except Exception as e:
logging.error(f"Error disconnecting client: {e}")
def server_monitor(self):
"""Monitor server status and display periodic updates"""
while self.running:
try:
time.sleep(30) # Update every 30 seconds
with self.lock:
client_count = len(self.clients)
if client_count > 0:
print(
f"πŸ“Š Server Status: {client_count} active clients - {datetime.now().strftime('%H:%M:%S')}"
)
except Exception as e:
logging.error(f"Server monitor error: {e}")
def shutdown(self):
"""Gracefully shutdown the server"""
print("\nπŸ”„ Shutting down server...")
self.running = False
# Notify all clients about server shutdown
self.broadcast_message(
{
"type": "system",
"message": "πŸ›‘ Server is shutting down. Thank you for using MCP Chat!",
}
)
# Close all client connections
with self.lock:
for client_socket in list(self.clients.keys()):
try:
client_socket.close()
except:
pass
self.clients.clear()
self.client_addresses.clear()
self.client_user_ids.clear()
# Close server socket
if self.server_socket:
try:
self.server_socket.close()
except:
pass
print("βœ… Server shutdown complete")
logging.info("Server shutdown complete")
def main():
"""Main server entry point"""
try:
# Get server configuration
host = input("Enter server host (default: 127.0.0.1): ").strip() or "127.0.0.1"
port_input = input("Enter server port (default: 12345): ").strip()
port = int(port_input) if port_input else 12345
# Create and start server
server = MCPChatServer(host, port)
server.start_server()
except KeyboardInterrupt:
print("\nπŸ‘‹ Server stopped by user")
except ValueError:
print("❌ Invalid port number")
except Exception as e:
print(f"❌ Server error: {e}")
logging.error(f"Server startup error: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment