-
-
Save shricodev/cf1d11f500ab270937582ff01eb63135 to your computer and use it in GitHub Desktop.
CLI MCP Chat Client with Composio (Developed by Claude Sonnet 4 AI Model) - Blog Demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced Chat Client | |
| Features: | |
| - Robust connection handling | |
| - Rich UI with colors and formatting | |
| - Automatic reconnection | |
| - Message history | |
| - Command support | |
| - Connection status monitoring | |
| """ | |
| import json | |
| import os | |
| import signal | |
| import socket | |
| import sys | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import List, Optional | |
| # Color codes for terminal output | |
| class Colors: | |
| RESET = "\033[0m" | |
| BOLD = "\033[1m" | |
| DIM = "\033[2m" | |
| # Text colors | |
| BLACK = "\033[30m" | |
| RED = "\033[31m" | |
| GREEN = "\033[32m" | |
| YELLOW = "\033[33m" | |
| BLUE = "\033[34m" | |
| MAGENTA = "\033[35m" | |
| CYAN = "\033[36m" | |
| WHITE = "\033[37m" | |
| # Bright colors | |
| BRIGHT_RED = "\033[91m" | |
| BRIGHT_GREEN = "\033[92m" | |
| BRIGHT_YELLOW = "\033[93m" | |
| BRIGHT_BLUE = "\033[94m" | |
| BRIGHT_MAGENTA = "\033[95m" | |
| BRIGHT_CYAN = "\033[96m" | |
| BRIGHT_WHITE = "\033[97m" | |
| class ChatClient: | |
| def __init__(self): | |
| self.client_socket = None | |
| self.connected = False | |
| self.username = "" | |
| self.server_ip = "" | |
| self.server_port = 12345 | |
| self.running = False | |
| self.message_history: List[str] = [] | |
| self.client_count = 0 | |
| self.last_ping = time.time() | |
| # Setup signal handlers | |
| signal.signal(signal.SIGINT, self.signal_handler) | |
| signal.signal(signal.SIGTERM, self.signal_handler) | |
| def signal_handler(self, signum, frame): | |
| """Handle shutdown signals gracefully""" | |
| print(f"\n{Colors.YELLOW}π Goodbye!{Colors.RESET}") | |
| self.disconnect() | |
| sys.exit(0) | |
| def clear_screen(self): | |
| """Clear terminal screen""" | |
| os.system("cls" if os.name == "nt" else "clear") | |
| def print_header(self): | |
| """Print application header""" | |
| self.clear_screen() | |
| print(f"{Colors.BRIGHT_CYAN}{Colors.BOLD}") | |
| print("=" * 60) | |
| print("π£οΈ ENHANCED CHAT CLIENT") | |
| print("=" * 60) | |
| print(f"{Colors.RESET}") | |
| def print_status(self, message, color=Colors.WHITE): | |
| """Print status message with color""" | |
| timestamp = datetime.now().strftime("%H:%M:%S") | |
| print(f"{Colors.DIM}[{timestamp}]{Colors.RESET} {color}{message}{Colors.RESET}") | |
| def get_connection_details(self): | |
| """Get server connection details from user""" | |
| self.print_header() | |
| print(f"{Colors.BRIGHT_YELLOW}π‘ Server Connection Setup{Colors.RESET}") | |
| print("-" * 30) | |
| while True: | |
| try: | |
| # Get server IP | |
| server_input = input( | |
| f"{Colors.CYAN}Enter server IP (default: 127.0.0.1): {Colors.RESET}" | |
| ).strip() | |
| self.server_ip = server_input if server_input else "127.0.0.1" | |
| # Validate IP format (basic validation) | |
| if self.server_ip != "127.0.0.1" and self.server_ip != "localhost": | |
| ip_parts = self.server_ip.split(".") | |
| if len(ip_parts) != 4 or not all( | |
| part.isdigit() and 0 <= int(part) <= 255 for part in ip_parts | |
| ): | |
| print(f"{Colors.RED}β Invalid IP address format{Colors.RESET}") | |
| continue | |
| # Get server port | |
| port_input = input( | |
| f"{Colors.CYAN}Enter server port (default: 12345): {Colors.RESET}" | |
| ).strip() | |
| if port_input: | |
| self.server_port = int(port_input) | |
| if not (1 <= self.server_port <= 65535): | |
| print( | |
| f"{Colors.RED}β Port must be between 1 and 65535{Colors.RESET}" | |
| ) | |
| continue | |
| break | |
| except ValueError: | |
| print(f"{Colors.RED}β Invalid port number{Colors.RESET}") | |
| except KeyboardInterrupt: | |
| print(f"\n{Colors.YELLOW}π Goodbye!{Colors.RESET}") | |
| sys.exit(0) | |
| def get_username(self): | |
| """Get username from user""" | |
| while True: | |
| try: | |
| username = input( | |
| f"{Colors.CYAN}Enter your username (1-20 characters): {Colors.RESET}" | |
| ).strip() | |
| if not username: | |
| print(f"{Colors.RED}β Username cannot be empty{Colors.RESET}") | |
| continue | |
| if len(username) > 20: | |
| print( | |
| f"{Colors.RED}β Username too long (max 20 characters){Colors.RESET}" | |
| ) | |
| continue | |
| if not username.replace("_", "").replace("-", "").isalnum(): | |
| print( | |
| f"{Colors.RED}β Username can only contain letters, numbers, hyphens, and underscores{Colors.RESET}" | |
| ) | |
| continue | |
| self.username = username | |
| break | |
| except KeyboardInterrupt: | |
| print(f"\n{Colors.YELLOW}π Goodbye!{Colors.RESET}") | |
| sys.exit(0) | |
| def get_user_email(self): | |
| """Get user email for MCP tool access (optional)""" | |
| try: | |
| print(f"\n{Colors.BRIGHT_MAGENTA}π§ MCP Tool Access Setup{Colors.RESET}") | |
| print( | |
| f"{Colors.DIM}Enter your email to enable AI tool access (Gmail, Calendar, etc.){Colors.RESET}" | |
| ) | |
| print(f"{Colors.DIM}Or press Enter to skip tool features{Colors.RESET}") | |
| email = input(f"{Colors.CYAN}Email (optional): {Colors.RESET}").strip() | |
| if email and "@" in email: | |
| return email | |
| elif email: | |
| print( | |
| f"{Colors.YELLOW}β οΈ Invalid email format, skipping tool access{Colors.RESET}" | |
| ) | |
| time.sleep(1) | |
| return None | |
| except KeyboardInterrupt: | |
| print(f"\n{Colors.YELLOW}π Goodbye!{Colors.RESET}") | |
| sys.exit(0) | |
| def connect_to_server(self, max_retries=3): | |
| """Connect to the chat server with retry logic""" | |
| for attempt in range(max_retries): | |
| try: | |
| self.print_status( | |
| f"π Connecting to {self.server_ip}:{self.server_port} (attempt {attempt + 1}/{max_retries})", | |
| Colors.YELLOW, | |
| ) | |
| self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.client_socket.settimeout(10) # 10 second timeout | |
| # Connect to server | |
| self.client_socket.connect((self.server_ip, self.server_port)) | |
| self.connected = True | |
| self.print_status("β Connected to server successfully!", Colors.GREEN) | |
| # Send username | |
| username_message = json.dumps( | |
| {"type": "auth", "message": self.username} | |
| ) | |
| self.client_socket.send(username_message.encode("utf-8")) | |
| # Send email if provided | |
| if hasattr(self, "user_email") and self.user_email: | |
| # Wait for email prompt from server | |
| time.sleep(0.1) | |
| email_message = json.dumps( | |
| {"type": "auth", "message": self.user_email} | |
| ) | |
| self.client_socket.send(email_message.encode("utf-8")) | |
| return True | |
| except socket.timeout: | |
| self.print_status("β° Connection timeout", Colors.RED) | |
| except ConnectionRefusedError: | |
| self.print_status( | |
| "β Connection refused - server may be down", Colors.RED | |
| ) | |
| except Exception as e: | |
| self.print_status(f"β Connection error: {e}", Colors.RED) | |
| if attempt < max_retries - 1: | |
| self.print_status("β³ Retrying in 2 seconds...", Colors.YELLOW) | |
| time.sleep(2) | |
| return False | |
| def receive_messages(self): | |
| """Receive messages from server in a separate thread""" | |
| buffer = "" | |
| while self.running and self.connected: | |
| try: | |
| data = self.client_socket.recv(1024).decode("utf-8") | |
| if not data: | |
| break | |
| buffer += data | |
| # Process complete messages (separated by newlines) | |
| while "\n" in buffer: | |
| line, buffer = buffer.split("\n", 1) | |
| if line.strip(): | |
| self.process_received_message(line.strip()) | |
| self.last_ping = time.time() | |
| except socket.timeout: | |
| # Check if connection is still alive | |
| if time.time() - self.last_ping > 60: # 1 minute timeout | |
| self.print_status("β οΈ Connection timeout", Colors.RED) | |
| break | |
| except socket.error: | |
| break | |
| except Exception as e: | |
| self.print_status(f"β Receive error: {e}", Colors.RED) | |
| break | |
| if self.running: | |
| self.print_status("β Connection lost", Colors.RED) | |
| self.connected = False | |
| def process_received_message(self, message_str): | |
| """Process received message from server""" | |
| try: | |
| message_data = json.loads(message_str) | |
| msg_type = message_data.get("type", "unknown") | |
| content = message_data.get("message", "") | |
| if msg_type == "chat": | |
| # Regular chat message | |
| username = message_data.get("username", "Unknown") | |
| timestamp = message_data.get("timestamp", "") | |
| if timestamp: | |
| time_obj = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) | |
| time_str = time_obj.strftime("%H:%M:%S") | |
| else: | |
| time_str = datetime.now().strftime("%H:%M:%S") | |
| # Color code based on username | |
| if username == self.username: | |
| username_color = Colors.BRIGHT_GREEN | |
| else: | |
| # Simple hash-based color assignment | |
| colors = [ | |
| Colors.BLUE, | |
| Colors.MAGENTA, | |
| Colors.CYAN, | |
| Colors.BRIGHT_BLUE, | |
| Colors.BRIGHT_MAGENTA, | |
| ] | |
| username_color = colors[hash(username) % len(colors)] | |
| formatted_message = f"{Colors.DIM}[{time_str}]{Colors.RESET} {username_color}{username}:{Colors.RESET} {content}" | |
| print(formatted_message) | |
| self.message_history.append(formatted_message) | |
| elif msg_type == "tool_response": | |
| # AI/Tool response message - special formatting | |
| username = message_data.get("username", "Unknown") | |
| timestamp = message_data.get("timestamp", "") | |
| if timestamp: | |
| time_obj = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) | |
| time_str = time_obj.strftime("%H:%M:%S") | |
| else: | |
| time_str = datetime.now().strftime("%H:%M:%S") | |
| # Special formatting for tool responses | |
| print(f"\n{Colors.BRIGHT_CYAN}{'=' * 60}{Colors.RESET}") | |
| print( | |
| f"{Colors.DIM}[{time_str}]{Colors.RESET} {Colors.BRIGHT_CYAN}π€ AI TOOL RESPONSE{Colors.RESET}" | |
| ) | |
| print(f"{Colors.BRIGHT_CYAN}{'=' * 60}{Colors.RESET}") | |
| # Split content by lines and format nicely | |
| lines = content.split("\n") | |
| for line in lines: | |
| if line.strip(): | |
| if line.startswith("π€"): | |
| print(f"{Colors.BRIGHT_WHITE}{line}{Colors.RESET}") | |
| elif line.startswith("π§"): | |
| print(f"{Colors.BRIGHT_GREEN}{line}{Colors.RESET}") | |
| elif line.startswith("π"): | |
| print(f"{Colors.BRIGHT_YELLOW}{line}{Colors.RESET}") | |
| else: | |
| print(f"{Colors.WHITE}{line}{Colors.RESET}") | |
| print(f"{Colors.BRIGHT_CYAN}{'=' * 60}{Colors.RESET}\n") | |
| elif msg_type == "system": | |
| # System message | |
| self.print_status(f"π {content}", Colors.BRIGHT_YELLOW) | |
| elif msg_type == "notification": | |
| # Notification message | |
| timestamp = message_data.get("timestamp", "") | |
| if timestamp: | |
| time_obj = datetime.fromisoformat(timestamp.replace("Z", "+00:00")) | |
| time_str = time_obj.strftime("%H:%M:%S") | |
| print( | |
| f"{Colors.DIM}[{time_str}]{Colors.RESET} {Colors.BRIGHT_BLUE}βΉοΈ {content}{Colors.RESET}" | |
| ) | |
| else: | |
| self.print_status(f"βΉοΈ {content}", Colors.BRIGHT_BLUE) | |
| elif msg_type == "error": | |
| # Error message | |
| self.print_status(f"β {content}", Colors.BRIGHT_RED) | |
| elif msg_type == "client_count": | |
| # Update client count | |
| self.client_count = message_data.get("count", 0) | |
| except json.JSONDecodeError: | |
| # Handle plain text messages | |
| print(f"{Colors.WHITE}{message_str}{Colors.RESET}") | |
| except Exception as e: | |
| self.print_status(f"Error processing message: {e}", Colors.RED) | |
| def send_message(self, message): | |
| """Send message to server""" | |
| if not self.connected: | |
| self.print_status("β Not connected to server", Colors.RED) | |
| return False | |
| try: | |
| if message.startswith("/"): | |
| # Command message | |
| message_data = {"type": "command", "message": message} | |
| else: | |
| # Regular chat message | |
| message_data = {"type": "chat", "message": message} | |
| message_json = json.dumps(message_data) | |
| self.client_socket.send(message_json.encode("utf-8")) | |
| return True | |
| except Exception as e: | |
| self.print_status(f"β Send error: {e}", Colors.RED) | |
| return False | |
| def show_help(self): | |
| """Show client help information""" | |
| help_text = f""" | |
| {Colors.BRIGHT_CYAN}π MCP Chat Client Help{Colors.RESET} | |
| {"-" * 30} | |
| {Colors.YELLOW}Chat Commands:{Colors.RESET} | |
| /help - Show server commands | |
| /users - List online users | |
| /time - Show server time | |
| /ping - Test connection | |
| /quit - Exit the chat | |
| /cls - Clear screen | |
| /history - Show message history | |
| {Colors.BRIGHT_MAGENTA}π§ MCP Tool Commands:{Colors.RESET} | |
| /setup-tools [toolkit] - Setup tool access (gmail, calendar, etc.) | |
| /authorize-tools - Complete tool authorization after visiting URL | |
| {Colors.YELLOW}Natural Language Tool Usage:{Colors.RESET} | |
| β’ "Send an email to john@example.com saying hello" | |
| β’ "Schedule a meeting for tomorrow at 2pm" | |
| β’ "Upload a file to my Google Drive" | |
| β’ "Create a new GitHub repository" | |
| {Colors.YELLOW}Special Features:{Colors.RESET} | |
| β’ Messages are color-coded by user | |
| β’ AI tool responses are highlighted | |
| β’ Connection status monitoring | |
| β’ Message timestamps | |
| β’ Client count display | |
| {Colors.YELLOW}Tips:{Colors.RESET} | |
| β’ Press Ctrl+C to exit gracefully | |
| β’ Use /cls to clear the screen | |
| β’ Tool commands start with '/' | |
| β’ Natural language works for AI tools | |
| """ | |
| print(help_text) | |
| def start_chat(self): | |
| """Start the main chat loop""" | |
| self.running = True | |
| # Start message receiving thread | |
| receive_thread = threading.Thread(target=self.receive_messages, daemon=True) | |
| receive_thread.start() | |
| # Wait a moment for initial server messages | |
| time.sleep(0.5) | |
| # Show welcome message | |
| print( | |
| f"\n{Colors.BRIGHT_GREEN}π Welcome to MCP Chat, {self.username}!{Colors.RESET}" | |
| ) | |
| print(f"{Colors.DIM}Type '/help' for commands, '/quit' to exit{Colors.RESET}") | |
| if hasattr(self, "user_email") and self.user_email: | |
| print( | |
| f"{Colors.BRIGHT_MAGENTA}π§ Tool access enabled for {self.user_email}{Colors.RESET}" | |
| ) | |
| print( | |
| f"{Colors.DIM}π‘ Use /setup-tools to authorize AI tool access{Colors.RESET}" | |
| ) | |
| print( | |
| f"{Colors.DIM}π¬ Just chat naturally - AI will use tools when appropriate!{Colors.RESET}" | |
| ) | |
| if self.client_count > 1: | |
| print(f"{Colors.DIM}π₯ {self.client_count} users online{Colors.RESET}") | |
| print("-" * 60) | |
| # Main chat loop | |
| while self.running and self.connected: | |
| try: | |
| message = input().strip() | |
| if not message: | |
| continue | |
| # Handle local commands | |
| if message.lower() in ["/quit", "/exit", "/q"]: | |
| break | |
| elif message.lower() in ["/cls", "/clear"]: | |
| self.clear_screen() | |
| continue | |
| elif message.lower() == "/help-client": | |
| self.show_help() | |
| continue | |
| elif message.lower() == "/history": | |
| print(f"\n{Colors.BRIGHT_CYAN}π Message History:{Colors.RESET}") | |
| for msg in self.message_history[-10:]: # Show last 10 messages | |
| print(msg) | |
| print("-" * 30) | |
| continue | |
| # Send message to server | |
| if not self.send_message(message): | |
| self.print_status("β Failed to send message", Colors.RED) | |
| break | |
| except KeyboardInterrupt: | |
| break | |
| except EOFError: | |
| break | |
| except Exception as e: | |
| self.print_status(f"β Input error: {e}", Colors.RED) | |
| self.disconnect() | |
| def disconnect(self): | |
| """Disconnect from server""" | |
| self.running = False | |
| self.connected = False | |
| if self.client_socket: | |
| try: | |
| self.client_socket.close() | |
| except: | |
| pass | |
| self.print_status("π Disconnected from server", Colors.YELLOW) | |
| def run(self): | |
| """Main client application""" | |
| try: | |
| # Setup connection | |
| self.get_connection_details() | |
| self.get_username() | |
| # Get optional email for MCP tools | |
| self.user_email = self.get_user_email() | |
| # Connect to server | |
| if self.connect_to_server(): | |
| self.start_chat() | |
| else: | |
| self.print_status("β Failed to connect to server", Colors.RED) | |
| except KeyboardInterrupt: | |
| print(f"\n{Colors.YELLOW}π Goodbye!{Colors.RESET}") | |
| except Exception as e: | |
| self.print_status(f"β Application error: {e}", Colors.RED) | |
| finally: | |
| self.disconnect() | |
| def main(): | |
| """Main client entry point""" | |
| client = ChatClient() | |
| client.run() | |
| if __name__ == "__main__": | |
| main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| MCP-Enhanced Chat Server with Composio Integration | |
| Simple and clean integration following the provided pattern | |
| """ | |
| import json | |
| import logging | |
| import os | |
| import signal | |
| import socket | |
| import sys | |
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Dict | |
| # External dependencies for MCP integration | |
| try: | |
| from composio import Composio | |
| from openai import OpenAI | |
| COMPOSIO_AVAILABLE = True | |
| except ImportError: | |
| print( | |
| "β οΈ Composio and OpenAI not installed. Install with: pip install composio openai" | |
| ) | |
| COMPOSIO_AVAILABLE = False | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s - %(levelname)s - %(message)s", | |
| handlers=[logging.FileHandler("mcp_chat_server.log"), logging.StreamHandler()], | |
| ) | |
| class ComposioIntegration: | |
| """Simple Composio integration following the provided pattern""" | |
| def __init__(self): | |
| if not COMPOSIO_AVAILABLE: | |
| self.enabled = False | |
| logging.warning("Composio not available - tool features disabled") | |
| return | |
| self.enabled = True | |
| self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| self.composio = Composio(api_key=os.getenv("COMPOSIO_API_KEY")) | |
| self.user_tools = {} # user_id -> tools | |
| self.user_connections = {} # user_id -> connection status | |
| def setup_user_tools(self, user_id: str, toolkits=["GMAIL"]): | |
| """Setup tools for a user following the exact pattern""" | |
| try: | |
| # Initialize connection request | |
| connection_request = self.composio.toolkits.authorize( | |
| user_id=user_id, toolkit=toolkits[0] | |
| ) | |
| # Store connection request for the user | |
| self.user_connections[user_id] = { | |
| "connection_request": connection_request, | |
| "toolkits": toolkits, | |
| "authorized": False, | |
| } | |
| return { | |
| "success": True, | |
| "redirect_url": connection_request.redirect_url, | |
| "toolkits": toolkits, | |
| } | |
| except Exception as e: | |
| logging.error(f"Error setting up tools for {user_id}: {e}") | |
| return {"success": False, "error": str(e)} | |
| def complete_authorization(self, user_id: str): | |
| """Complete the authorization process""" | |
| try: | |
| if user_id not in self.user_connections: | |
| return {"success": False, "error": "No authorization request found"} | |
| connection_info = self.user_connections[user_id] | |
| connection_request = connection_info["connection_request"] | |
| toolkits = connection_info["toolkits"] | |
| # Wait for the connection to be active (following the pattern) | |
| connection_request.wait_for_connection() | |
| # Fetch tools (following the pattern) | |
| tools = self.composio.tools.get(user_id=user_id, toolkits=toolkits) | |
| # Store tools for the user | |
| self.user_tools[user_id] = tools | |
| self.user_connections[user_id]["authorized"] = True | |
| return {"success": True, "toolkits": toolkits, "tool_count": len(tools)} | |
| except Exception as e: | |
| logging.error(f"Error completing authorization for {user_id}: {e}") | |
| return {"success": False, "error": str(e)} | |
| def execute_with_tools(self, user_id: str, message: str): | |
| """Execute user message with tools following the exact pattern""" | |
| try: | |
| if user_id not in self.user_tools: | |
| return { | |
| "success": False, | |
| "error": "User not authorized. Use /setup-tools first.", | |
| } | |
| tools = self.user_tools[user_id] | |
| # Invoke agent (following the exact pattern) | |
| completion = self.openai.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": message, | |
| }, | |
| ], | |
| tools=tools, | |
| ) | |
| # Handle Result from tool call (following the exact pattern) | |
| result = self.composio.provider.handle_tool_calls( | |
| user_id=user_id, response=completion | |
| ) | |
| return { | |
| "success": True, | |
| "result": result, | |
| "ai_response": completion.choices[0].message.content, | |
| "tool_calls_made": bool(completion.choices[0].message.tool_calls), | |
| } | |
| except Exception as e: | |
| logging.error(f"Error executing tools for {user_id}: {e}") | |
| return {"success": False, "error": str(e)} | |
| class MCPChatServer: | |
| def __init__(self, host="127.0.0.1", port=12345): | |
| self.host = host | |
| self.port = port | |
| self.server_socket = None | |
| self.clients: Dict[socket.socket, str] = {} # socket -> username | |
| self.client_addresses: Dict[socket.socket, tuple] = {} # socket -> address | |
| self.client_user_ids: Dict[socket.socket, str] = {} # socket -> user_id (email) | |
| self.running = False | |
| self.lock = threading.Lock() | |
| # Initialize Composio integration | |
| self.composio_integration = ComposioIntegration() | |
| # Setup signal handlers for graceful shutdown | |
| signal.signal(signal.SIGINT, self.signal_handler) | |
| signal.signal(signal.SIGTERM, self.signal_handler) | |
| def signal_handler(self, signum, frame): | |
| """Handle shutdown signals gracefully""" | |
| print("\nπ Server shutdown initiated...") | |
| self.shutdown() | |
| sys.exit(0) | |
| def start_server(self): | |
| """Initialize and start the server""" | |
| try: | |
| self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
| self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| self.server_socket.bind((self.host, self.port)) | |
| self.server_socket.listen(10) | |
| self.running = True | |
| print("=" * 70) | |
| print("π MCP-ENHANCED CHAT SERVER") | |
| print("=" * 70) | |
| print(f"π‘ Server: {self.host}:{self.port}") | |
| print( | |
| f"π§ Composio: {'β Ready' if self.composio_integration.enabled else 'β Disabled'}" | |
| ) | |
| print(f"β° Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
| print("=" * 70) | |
| logging.info(f"MCP Server started on {self.host}:{self.port}") | |
| # Start server monitoring thread | |
| monitor_thread = threading.Thread(target=self.server_monitor, daemon=True) | |
| monitor_thread.start() | |
| # Accept connections | |
| self.accept_connections() | |
| except Exception as e: | |
| logging.error(f"Failed to start server: {e}") | |
| self.shutdown() | |
| def accept_connections(self): | |
| """Accept incoming client connections""" | |
| while self.running: | |
| try: | |
| client_socket, address = self.server_socket.accept() | |
| # Handle new client in separate thread | |
| client_thread = threading.Thread( | |
| target=self.handle_client, | |
| args=(client_socket, address), | |
| daemon=True, | |
| ) | |
| client_thread.start() | |
| except socket.error as e: | |
| if self.running: | |
| logging.error(f"Error accepting connection: {e}") | |
| break | |
| def handle_client(self, client_socket, address): | |
| """Handle individual client connection""" | |
| username = None | |
| user_id = None | |
| try: | |
| # Set socket timeout | |
| client_socket.settimeout(300) # 5 minutes timeout | |
| # Request username | |
| self.send_message( | |
| client_socket, | |
| {"type": "system", "message": "Please enter your username:"}, | |
| ) | |
| # Receive username | |
| username_data = client_socket.recv(1024).decode("utf-8").strip() | |
| if not username_data: | |
| raise ConnectionError("No username provided") | |
| try: | |
| username_msg = json.loads(username_data) | |
| username = username_msg.get("message", "").strip() | |
| except json.JSONDecodeError: | |
| username = username_data.strip() | |
| if not username or len(username) > 20: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": "Invalid username. Must be 1-20 characters.", | |
| }, | |
| ) | |
| return | |
| # Request user ID (email) for Composio | |
| if self.composio_integration.enabled: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "system", | |
| "message": "Please enter your email for tool access (or press Enter to skip):", | |
| }, | |
| ) | |
| user_id_data = client_socket.recv(1024).decode("utf-8").strip() | |
| try: | |
| user_id_msg = json.loads(user_id_data) | |
| user_id = user_id_msg.get("message", "").strip() | |
| except json.JSONDecodeError: | |
| user_id = user_id_data.strip() | |
| # Check if username is already taken | |
| with self.lock: | |
| if username in self.clients.values(): | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": "Username already taken. Please choose another.", | |
| }, | |
| ) | |
| return | |
| # Add client to active clients | |
| self.clients[client_socket] = username | |
| self.client_addresses[client_socket] = address | |
| if user_id: | |
| self.client_user_ids[client_socket] = user_id | |
| # Welcome the new client | |
| welcome_msg = f"Welcome to the MCP Chat, {username}! π" | |
| if self.composio_integration.enabled and user_id: | |
| welcome_msg += f"\nπ§ Tool access enabled for {user_id}" | |
| welcome_msg += "\nπ‘ Use /setup-tools to authorize tool access" | |
| elif self.composio_integration.enabled: | |
| welcome_msg += "\nπ‘ Reconnect with email to enable tool access" | |
| self.send_message(client_socket, {"type": "system", "message": welcome_msg}) | |
| # Notify all clients about new user | |
| self.broadcast_message( | |
| { | |
| "type": "notification", | |
| "message": f"π {username} joined the chat", | |
| "timestamp": datetime.now().isoformat(), | |
| }, | |
| exclude=client_socket, | |
| ) | |
| print(f"β {username} ({address[0]}:{address[1]}) connected") | |
| print(f"π₯ Active clients: {len(self.clients)}") | |
| logging.info(f"Client {username} connected from {address}") | |
| # Send current client count | |
| self.send_client_count() | |
| # Handle client messages | |
| while self.running: | |
| try: | |
| data = client_socket.recv(1024) | |
| if not data: | |
| break | |
| message_str = data.decode("utf-8").strip() | |
| if not message_str: | |
| continue | |
| try: | |
| message_data = json.loads(message_str) | |
| self.process_message(client_socket, username, message_data) | |
| except json.JSONDecodeError: | |
| # Handle plain text messages for backward compatibility | |
| self.process_message( | |
| client_socket, | |
| username, | |
| {"type": "chat", "message": message_str}, | |
| ) | |
| except socket.timeout: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "system", | |
| "message": "β οΈ Connection timeout. Please send a message to stay connected.", | |
| }, | |
| ) | |
| except socket.error: | |
| break | |
| except Exception as e: | |
| logging.error(f"Error handling client {username or 'unknown'}: {e}") | |
| finally: | |
| self.disconnect_client(client_socket, username) | |
| def process_message(self, client_socket, username, message_data): | |
| """Process different types of messages""" | |
| msg_type = message_data.get("type", "chat") | |
| message = message_data.get("message", "").strip() | |
| if not message: | |
| return | |
| if msg_type == "chat": | |
| # Check if this might be a tool request | |
| user_id = self.client_user_ids.get(client_socket) | |
| if ( | |
| self.composio_integration.enabled | |
| and user_id | |
| and user_id in self.composio_integration.user_tools | |
| ): | |
| # Try to execute with tools | |
| self.execute_tool_message(client_socket, username, user_id, message) | |
| else: | |
| # Regular chat message | |
| chat_message = { | |
| "type": "chat", | |
| "username": username, | |
| "message": message, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| self.broadcast_message(chat_message) | |
| logging.info(f"Message from {username}: {message}") | |
| elif msg_type == "command": | |
| # Handle special commands | |
| self.handle_command(client_socket, username, message) | |
| def execute_tool_message(self, client_socket, username, user_id, message): | |
| """Execute message with tool support""" | |
| try: | |
| # Show that we're processing the request | |
| self.broadcast_message( | |
| { | |
| "type": "notification", | |
| "message": f"π§ {username} is using AI tools...", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| ) | |
| # Execute with tools | |
| result = self.composio_integration.execute_with_tools(user_id, message) | |
| if result["success"]: | |
| # Show the result to everyone | |
| response_msg = f"π€ AI Response for {username}:\n" | |
| if result["ai_response"]: | |
| response_msg += f"{result['ai_response']}\n" | |
| if result["tool_calls_made"]: | |
| response_msg += f"π§ Tool Action Completed!\n" | |
| if result["result"]: | |
| response_msg += f"π Result: {str(result['result'])[:200]}..." | |
| self.broadcast_message( | |
| { | |
| "type": "tool_response", | |
| "username": username, | |
| "message": response_msg, | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| ) | |
| else: | |
| # Show error | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": f"Tool execution failed: {result['error']}", | |
| }, | |
| ) | |
| except Exception as e: | |
| logging.error(f"Tool execution error: {e}") | |
| self.send_message( | |
| client_socket, | |
| {"type": "error", "message": f"Tool execution error: {str(e)}"}, | |
| ) | |
| def handle_command(self, client_socket, username, command): | |
| """Handle special chat commands""" | |
| if command.startswith("/"): | |
| cmd_parts = command[1:].split() | |
| cmd = cmd_parts[0].lower() | |
| if cmd == "setup-tools": | |
| self.handle_setup_tools(client_socket, username, cmd_parts[1:]) | |
| elif cmd == "authorize-tools": | |
| self.handle_authorize_tools(client_socket, username) | |
| elif cmd == "help": | |
| help_text = """ | |
| Available commands: | |
| /help - Show this help message | |
| /users - List all online users | |
| /time - Show current server time | |
| /ping - Test connection | |
| /setup-tools [toolkit] - Setup tool access (e.g., /setup-tools gmail) | |
| /authorize-tools - Complete tool authorization | |
| """ | |
| self.send_message( | |
| client_socket, {"type": "system", "message": help_text} | |
| ) | |
| elif cmd == "users": | |
| with self.lock: | |
| users = list(self.clients.values()) | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "system", | |
| "message": f"Online users ({len(users)}): {', '.join(users)}", | |
| }, | |
| ) | |
| elif cmd == "time": | |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| self.send_message( | |
| client_socket, | |
| {"type": "system", "message": f"Server time: {current_time}"}, | |
| ) | |
| elif cmd == "ping": | |
| self.send_message( | |
| client_socket, {"type": "system", "message": "π Pong!"} | |
| ) | |
| else: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": f"Unknown command: /{cmd}. Type /help for available commands.", | |
| }, | |
| ) | |
| def handle_setup_tools(self, client_socket, username, args): | |
| """Handle tool setup command""" | |
| if not self.composio_integration.enabled: | |
| self.send_message( | |
| client_socket, | |
| {"type": "error", "message": "Composio integration not available"}, | |
| ) | |
| return | |
| user_id = self.client_user_ids.get(client_socket) | |
| if not user_id: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": "Email required for tool access. Please reconnect with your email.", | |
| }, | |
| ) | |
| return | |
| # Default to Gmail if no toolkit specified | |
| toolkit = args[0].upper() if args else "GMAIL" | |
| # Setup tools | |
| result = self.composio_integration.setup_user_tools(user_id, [toolkit]) | |
| if result["success"]: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "system", | |
| "message": f"π Visit this URL to authorize {toolkit}:\nπ {result['redirect_url']}\n\nThen use /authorize-tools to complete setup", | |
| }, | |
| ) | |
| # Notify others | |
| self.broadcast_message( | |
| { | |
| "type": "notification", | |
| "message": f"π§ {username} is setting up {toolkit} tools", | |
| "timestamp": datetime.now().isoformat(), | |
| }, | |
| exclude=client_socket, | |
| ) | |
| else: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": f"Failed to setup tools: {result['error']}", | |
| }, | |
| ) | |
| def handle_authorize_tools(self, client_socket, username): | |
| """Handle tool authorization completion""" | |
| if not self.composio_integration.enabled: | |
| self.send_message( | |
| client_socket, | |
| {"type": "error", "message": "Composio integration not available"}, | |
| ) | |
| return | |
| user_id = self.client_user_ids.get(client_socket) | |
| if not user_id: | |
| self.send_message( | |
| client_socket, | |
| {"type": "error", "message": "Email required for tool access"}, | |
| ) | |
| return | |
| # Complete authorization | |
| result = self.composio_integration.complete_authorization(user_id) | |
| if result["success"]: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "system", | |
| "message": f"β Tool authorization complete! You now have access to {result['tool_count']} tools.\nπ‘ Just chat normally - AI will use tools when needed!", | |
| }, | |
| ) | |
| # Notify others | |
| self.broadcast_message( | |
| { | |
| "type": "notification", | |
| "message": f"π {username} now has AI tool access!", | |
| "timestamp": datetime.now().isoformat(), | |
| }, | |
| exclude=client_socket, | |
| ) | |
| else: | |
| self.send_message( | |
| client_socket, | |
| { | |
| "type": "error", | |
| "message": f"Authorization failed: {result['error']}", | |
| }, | |
| ) | |
| def send_message(self, client_socket, message_data): | |
| """Send message to a specific client""" | |
| try: | |
| message_json = json.dumps(message_data) + "\n" | |
| client_socket.send(message_json.encode("utf-8")) | |
| except Exception as e: | |
| logging.error(f"Error sending message to client: {e}") | |
| def broadcast_message(self, message_data, exclude=None): | |
| """Broadcast message to all connected clients""" | |
| with self.lock: | |
| disconnected_clients = [] | |
| for client_socket in list(self.clients.keys()): | |
| if client_socket == exclude: | |
| continue | |
| try: | |
| self.send_message(client_socket, message_data) | |
| except Exception as e: | |
| logging.error(f"Error broadcasting to client: {e}") | |
| disconnected_clients.append(client_socket) | |
| # Remove disconnected clients | |
| for client_socket in disconnected_clients: | |
| self.disconnect_client(client_socket, self.clients.get(client_socket)) | |
| def send_client_count(self): | |
| """Send current client count to all clients""" | |
| with self.lock: | |
| count = len(self.clients) | |
| self.broadcast_message({"type": "client_count", "count": count}) | |
| def disconnect_client(self, client_socket, username): | |
| """Handle client disconnection""" | |
| try: | |
| with self.lock: | |
| if client_socket in self.clients: | |
| username = self.clients[client_socket] | |
| del self.clients[client_socket] | |
| if client_socket in self.client_addresses: | |
| address = self.client_addresses[client_socket] | |
| del self.client_addresses[client_socket] | |
| print( | |
| f"β {username or 'Unknown'} ({address[0]}:{address[1]}) disconnected" | |
| ) | |
| if client_socket in self.client_user_ids: | |
| del self.client_user_ids[client_socket] | |
| print(f"π₯ Active clients: {len(self.clients)}") | |
| client_socket.close() | |
| if username: | |
| # Notify other clients | |
| self.broadcast_message( | |
| { | |
| "type": "notification", | |
| "message": f"π {username} left the chat", | |
| "timestamp": datetime.now().isoformat(), | |
| } | |
| ) | |
| logging.info(f"Client {username} disconnected") | |
| # Update client count | |
| self.send_client_count() | |
| except Exception as e: | |
| logging.error(f"Error disconnecting client: {e}") | |
| def server_monitor(self): | |
| """Monitor server status and display periodic updates""" | |
| while self.running: | |
| try: | |
| time.sleep(30) # Update every 30 seconds | |
| with self.lock: | |
| client_count = len(self.clients) | |
| if client_count > 0: | |
| print( | |
| f"π Server Status: {client_count} active clients - {datetime.now().strftime('%H:%M:%S')}" | |
| ) | |
| except Exception as e: | |
| logging.error(f"Server monitor error: {e}") | |
| def shutdown(self): | |
| """Gracefully shutdown the server""" | |
| print("\nπ Shutting down server...") | |
| self.running = False | |
| # Notify all clients about server shutdown | |
| self.broadcast_message( | |
| { | |
| "type": "system", | |
| "message": "π Server is shutting down. Thank you for using MCP Chat!", | |
| } | |
| ) | |
| # Close all client connections | |
| with self.lock: | |
| for client_socket in list(self.clients.keys()): | |
| try: | |
| client_socket.close() | |
| except: | |
| pass | |
| self.clients.clear() | |
| self.client_addresses.clear() | |
| self.client_user_ids.clear() | |
| # Close server socket | |
| if self.server_socket: | |
| try: | |
| self.server_socket.close() | |
| except: | |
| pass | |
| print("β Server shutdown complete") | |
| logging.info("Server shutdown complete") | |
| def main(): | |
| """Main server entry point""" | |
| try: | |
| # Get server configuration | |
| host = input("Enter server host (default: 127.0.0.1): ").strip() or "127.0.0.1" | |
| port_input = input("Enter server port (default: 12345): ").strip() | |
| port = int(port_input) if port_input else 12345 | |
| # Create and start server | |
| server = MCPChatServer(host, port) | |
| server.start_server() | |
| except KeyboardInterrupt: | |
| print("\nπ Server stopped by user") | |
| except ValueError: | |
| print("β Invalid port number") | |
| except Exception as e: | |
| print(f"β Server error: {e}") | |
| logging.error(f"Server startup error: {e}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment