Last active
March 3, 2025 21:09
-
-
Save qwertychouskie/b3a39d587406ae6f16ebda2daa267204 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# OBS Scene autoswitcher | |
# https://claude.ai/share/8fe02bf6-febe-420e-bee2-db39d224a3df | |
import json | |
import time | |
import asyncio | |
import signal | |
import threading | |
import websockets | |
import obswebsocket | |
from obswebsocket import requests as obsrequests | |
import tkinter as tk | |
from tkinter import ttk, messagebox, scrolledtext | |
class FTCFieldSwitcher: | |
def __init__(self, event_code, obs_host="localhost", obs_port=4444, obs_password=""): | |
""" | |
Initialize the FTC Field Switcher with WebSocket connection details and OBS parameters | |
Args: | |
event_code: FTC event code for the WebSocket connection | |
obs_host: OBS WebSocket host (default: localhost) | |
obs_port: OBS WebSocket port (default: 4444) | |
obs_password: OBS WebSocket password (default: empty string) | |
""" | |
self.ftc_ws_url = f"ws://localhost:8080/stream/display/command/?code={event_code}" | |
self.event_code = event_code | |
self.obs_host = obs_host | |
self.obs_port = obs_port | |
self.obs_password = obs_password | |
self.obs_ws = None | |
self.current_field = None | |
self.field_scene_mapping = { | |
1: "Field 1", | |
2: "Field 2", | |
# Add more field-to-scene mappings as needed | |
} | |
self.running = False | |
self.ftc_websocket = None | |
self.log_callback = None | |
def set_log_callback(self, callback): | |
""" | |
Set a callback function for logging | |
Args: | |
callback: Function that accepts a string message | |
""" | |
self.log_callback = callback | |
def log(self, message): | |
""" | |
Log a message to console and via callback if set | |
Args: | |
message: Message to log | |
""" | |
print(message) | |
if self.log_callback: | |
self.log_callback(message) | |
def connect_to_obs(self): | |
"""Establish connection to OBS WebSocket server""" | |
self.obs_ws = obswebsocket.obsws(self.obs_host, self.obs_port, self.obs_password) | |
try: | |
self.obs_ws.connect() | |
self.log("Connected to OBS WebSocket server") | |
return True | |
except Exception as e: | |
self.log(f"Error connecting to OBS: {e}") | |
return False | |
def disconnect_from_obs(self): | |
"""Disconnect from OBS WebSocket server""" | |
if self.obs_ws and self.obs_ws.ws.connected: | |
try: | |
self.obs_ws.disconnect() | |
self.log("Disconnected from OBS WebSocket server") | |
except Exception as e: | |
self.log(f"Error disconnecting from OBS: {e}") | |
def update_field_scene_mapping(self, field_scene_dict): | |
""" | |
Update the mapping between fields and OBS scenes | |
Args: | |
field_scene_dict: Dictionary mapping field numbers to scene names | |
""" | |
self.field_scene_mapping.update(field_scene_dict) | |
self.log("Updated field-to-scene mapping") | |
def switch_scene(self, field_number): | |
""" | |
Switch OBS scene based on field number | |
Args: | |
field_number: Field number (integer) | |
Returns: | |
bool: Success status | |
""" | |
if field_number not in self.field_scene_mapping: | |
self.log(f"No scene mapping found for Field {field_number}") | |
return False | |
scene_name = self.field_scene_mapping[field_number] | |
try: | |
response = self.obs_ws.call(obsrequests.SetCurrentProgramScene(sceneName=scene_name)) | |
if response.status: | |
self.log(f"Switched to scene: {scene_name} for Field {field_number}") | |
return True | |
else: | |
self.log(f"Failed to switch scene: {response.error}") | |
return False | |
except Exception as e: | |
self.log(f"Error switching scene: {e}") | |
return False | |
async def shutdown(self): | |
""" | |
Gracefully shutdown the monitoring | |
""" | |
self.log("\nShutting down...") | |
self.running = False | |
# Close FTC WebSocket if it exists | |
if self.ftc_websocket and not self.ftc_websocket.closed: | |
await self.ftc_websocket.close() | |
self.log("Closed FTC WebSocket connection") | |
# Disconnect from OBS | |
self.disconnect_from_obs() | |
self.log("Shutdown complete") | |
async def monitor_ftc_websocket(self): | |
""" | |
Connect to FTC system WebSocket and monitor for SHOW_MATCH messages | |
""" | |
if not self.connect_to_obs(): | |
self.log("Failed to connect to OBS. Exiting.") | |
return | |
self.log(f"Connecting to FTC WebSocket: {self.ftc_ws_url}") | |
self.log(f"Current field-scene mapping: {json.dumps(self.field_scene_mapping, indent=2)}") | |
self.running = True | |
try: | |
async with websockets.connect(self.ftc_ws_url) as websocket: | |
self.ftc_websocket = websocket | |
self.log("Connected to FTC scoring system WebSocket") | |
while self.running: | |
try: | |
# Set a timeout so we can check if we should still be running | |
message = await asyncio.wait_for(websocket.recv(), timeout=1.0) | |
data = json.loads(message) | |
# Check if it's a SHOW_MATCH message | |
if data.get("type") == "SHOW_MATCH": | |
# Extract field number | |
field_number = data.get("field") | |
if field_number is None and "params" in data: | |
field_number = data["params"].get("field") | |
if field_number is not None and field_number != self.current_field: | |
self.log(f"Field change detected: {self.current_field} -> {field_number}") | |
if self.switch_scene(field_number): | |
self.current_field = field_number | |
except asyncio.TimeoutError: | |
# This is expected - it's just to allow checking self.running periodically | |
continue | |
except json.JSONDecodeError as e: | |
if message != "pong": # This is expected to show up periodically | |
self.log(f"Error decoding message: {e}") | |
except Exception as e: | |
if self.running: # Only print errors if we're still supposed to be running | |
self.log(f"Error processing message: {e}") | |
except asyncio.CancelledError: | |
self.log("WebSocket monitoring cancelled") | |
except websockets.exceptions.ConnectionClosed: | |
self.log("Connection to FTC scoring system closed") | |
except Exception as e: | |
if self.running: # Only print errors if we're still supposed to be running | |
self.log(f"WebSocket error: {e}") | |
finally: | |
await self.shutdown() | |
class FTCSwitcherGUI: | |
def __init__(self, root): | |
self.root = root | |
self.root.title("FTC OBS Scene Switcher") | |
self.root.geometry("700x600") | |
self.root.resizable(True, True) | |
self.switcher = None | |
self.async_loop = None | |
self.monitor_task = None | |
self.thread = None | |
self.create_widgets() | |
self.load_config() | |
def create_widgets(self): | |
# Main frame | |
main_frame = ttk.Frame(self.root, padding="10") | |
main_frame.pack(fill=tk.BOTH, expand=True) | |
# Create notebook with tabs | |
notebook = ttk.Notebook(main_frame) | |
notebook.pack(fill=tk.BOTH, expand=True, pady=5) | |
# Connection settings tab | |
conn_frame = ttk.Frame(notebook, padding="10") | |
notebook.add(conn_frame, text="Connection Settings") | |
# FTC Settings | |
ttk.Label(conn_frame, text="FTC Settings", font=("", 12, "bold")).grid(row=0, column=0, columnspan=2, sticky=tk.W, pady=(0, 5)) | |
ttk.Label(conn_frame, text="Event Code:").grid(row=1, column=0, sticky=tk.W, pady=2) | |
self.event_code_var = tk.StringVar() | |
ttk.Entry(conn_frame, textvariable=self.event_code_var, width=30).grid(row=1, column=1, sticky=tk.W, pady=2) | |
# OBS Settings | |
ttk.Label(conn_frame, text="OBS Settings", font=("", 12, "bold")).grid(row=2, column=0, columnspan=2, sticky=tk.W, pady=(10, 5)) | |
ttk.Label(conn_frame, text="Host:").grid(row=3, column=0, sticky=tk.W, pady=2) | |
self.obs_host_var = tk.StringVar(value="localhost") | |
ttk.Entry(conn_frame, textvariable=self.obs_host_var, width=30).grid(row=3, column=1, sticky=tk.W, pady=2) | |
ttk.Label(conn_frame, text="Port:").grid(row=4, column=0, sticky=tk.W, pady=2) | |
self.obs_port_var = tk.StringVar(value="4444") | |
ttk.Entry(conn_frame, textvariable=self.obs_port_var, width=30).grid(row=4, column=1, sticky=tk.W, pady=2) | |
ttk.Label(conn_frame, text="Password:").grid(row=5, column=0, sticky=tk.W, pady=2) | |
self.obs_password_var = tk.StringVar() | |
ttk.Entry(conn_frame, textvariable=self.obs_password_var, width=30, show="*").grid(row=5, column=1, sticky=tk.W, pady=2) | |
# Scene Mapping tab | |
mapping_frame = ttk.Frame(notebook, padding="10") | |
notebook.add(mapping_frame, text="Scene Mapping") | |
# Scene Mapping | |
ttk.Label(mapping_frame, text="Field to Scene Mapping", font=("", 12, "bold")).grid(row=0, column=0, columnspan=2, sticky=tk.W, pady=(0, 10)) | |
# Create a frame for the mapping entries | |
mapping_entries_frame = ttk.Frame(mapping_frame) | |
mapping_entries_frame.grid(row=1, column=0, columnspan=2, sticky=tk.W, pady=5) | |
# Create mapping entries for fields 1 and 2 | |
self.scene_mappings = {} | |
for i in range(1, 3): | |
ttk.Label(mapping_entries_frame, text=f"Field {i}:").grid(row=i-1, column=0, sticky=tk.W, pady=2) | |
scene_var = tk.StringVar(value=f"Field {i}") | |
ttk.Entry(mapping_entries_frame, textvariable=scene_var, width=30).grid(row=i-1, column=1, sticky=tk.W, pady=2) | |
self.scene_mappings[i] = scene_var | |
# Add button | |
add_frame = ttk.Frame(mapping_frame) | |
add_frame.grid(row=2, column=0, columnspan=2, sticky=tk.W, pady=10) | |
self.new_field_var = tk.StringVar() | |
self.new_scene_var = tk.StringVar() | |
ttk.Label(add_frame, text="New Field Number:").grid(row=0, column=0, sticky=tk.W, pady=2) | |
ttk.Entry(add_frame, textvariable=self.new_field_var, width=10).grid(row=0, column=1, sticky=tk.W, pady=2) | |
ttk.Label(add_frame, text="Scene Name:").grid(row=0, column=2, sticky=tk.W, pady=2, padx=(10, 0)) | |
ttk.Entry(add_frame, textvariable=self.new_scene_var, width=20).grid(row=0, column=3, sticky=tk.W, pady=2) | |
ttk.Button(add_frame, text="Add Mapping", command=self.add_mapping).grid(row=0, column=4, sticky=tk.W, pady=2, padx=(10, 0)) | |
# Control & Log Frame | |
control_frame = ttk.Frame(main_frame, padding="10") | |
control_frame.pack(fill=tk.BOTH, expand=True, pady=5) | |
# Control buttons | |
button_frame = ttk.Frame(control_frame) | |
button_frame.pack(fill=tk.X) | |
self.start_button = ttk.Button(button_frame, text="Start Monitoring", command=self.start_monitoring) | |
self.start_button.pack(side=tk.LEFT, padx=5) | |
self.stop_button = ttk.Button(button_frame, text="Stop Monitoring", command=self.stop_monitoring, state=tk.DISABLED) | |
self.stop_button.pack(side=tk.LEFT, padx=5) | |
ttk.Button(button_frame, text="Save Config", command=self.save_config).pack(side=tk.LEFT, padx=5) | |
# Status indicator | |
self.status_var = tk.StringVar(value="Status: Not Running") | |
status_label = ttk.Label(button_frame, textvariable=self.status_var) | |
status_label.pack(side=tk.RIGHT, padx=5) | |
# Log area | |
ttk.Label(control_frame, text="Log", font=("", 10, "bold")).pack(anchor=tk.W, pady=(10, 5)) | |
self.log_text = scrolledtext.ScrolledText(control_frame, height=10) | |
self.log_text.pack(fill=tk.BOTH, expand=True) | |
self.log_text.config(state=tk.DISABLED) | |
def add_mapping(self): | |
try: | |
field_num = int(self.new_field_var.get()) | |
scene_name = self.new_scene_var.get() | |
if not scene_name: | |
messagebox.showerror("Error", "Scene name cannot be empty") | |
return | |
# Create new entry if it doesn't exist | |
if field_num not in self.scene_mappings: | |
row_num = len(self.scene_mappings) + 1 | |
mapping_frame = self.root.nametowidget(self.root.winfo_children()[0].winfo_children()[0].winfo_children()[1]) | |
entries_frame = mapping_frame.winfo_children()[1] | |
ttk.Label(entries_frame, text=f"Field {field_num} Scene:").grid(row=row_num, column=0, sticky=tk.W, pady=2) | |
scene_var = tk.StringVar(value=scene_name) | |
ttk.Entry(entries_frame, textvariable=scene_var, width=30).grid(row=row_num, column=1, sticky=tk.W, pady=2) | |
self.scene_mappings[field_num] = scene_var | |
else: | |
# Update existing entry | |
self.scene_mappings[field_num].set(scene_name) | |
# Clear entry fields | |
self.new_field_var.set("") | |
self.new_scene_var.set("") | |
self.log(f"Added mapping: Field {field_num} -> {scene_name}") | |
except ValueError: | |
messagebox.showerror("Error", "Field number must be a valid integer") | |
def get_field_scene_mapping(self): | |
mapping = {} | |
for field_num, scene_var in self.scene_mappings.items(): | |
mapping[field_num] = scene_var.get() | |
return mapping | |
def log(self, message): | |
self.log_text.config(state=tk.NORMAL) | |
self.log_text.insert(tk.END, f"{time.strftime('%H:%M:%S')} - {message}\n") | |
self.log_text.see(tk.END) | |
self.log_text.config(state=tk.DISABLED) | |
def save_config(self): | |
config = { | |
"event_code": self.event_code_var.get(), | |
"obs_host": self.obs_host_var.get(), | |
"obs_port": self.obs_port_var.get(), | |
"obs_password": self.obs_password_var.get(), | |
"scene_mapping": {int(k): v.get() for k, v in self.scene_mappings.items()} | |
} | |
try: | |
with open("ftc_obs_config.json", "w") as f: | |
json.dump(config, f, indent=2) | |
self.log("Configuration saved") | |
except Exception as e: | |
self.log(f"Error saving configuration: {e}") | |
messagebox.showerror("Error", f"Failed to save configuration: {e}") | |
def load_config(self): | |
try: | |
with open("ftc_obs_config.json", "r") as f: | |
config = json.load(f) | |
self.event_code_var.set(config.get("event_code", "")) | |
self.obs_host_var.set(config.get("obs_host", "localhost")) | |
self.obs_port_var.set(config.get("obs_port", "4444")) | |
self.obs_password_var.set(config.get("obs_password", "")) | |
# Load scene mappings | |
scene_mapping = config.get("scene_mapping", {}) | |
for field_str, scene in scene_mapping.items(): | |
field = int(field_str) | |
if field in self.scene_mappings: | |
self.scene_mappings[field].set(scene) | |
else: | |
# Need to add this mapping to the UI | |
self.new_field_var.set(str(field)) | |
self.new_scene_var.set(scene) | |
self.add_mapping() | |
self.log("Configuration loaded") | |
except FileNotFoundError: | |
self.log("No configuration file found, using defaults") | |
except Exception as e: | |
self.log(f"Error loading configuration: {e}") | |
def start_monitoring(self): | |
# Get configuration values | |
event_code = self.event_code_var.get() | |
obs_host = self.obs_host_var.get() | |
obs_port = self.obs_port_var.get() | |
obs_password = self.obs_password_var.get() | |
if not event_code: | |
messagebox.showerror("Error", "Event code is required") | |
return | |
try: | |
obs_port = int(obs_port) | |
except ValueError: | |
messagebox.showerror("Error", "OBS port must be a valid number") | |
return | |
# Create switcher instance | |
self.switcher = FTCFieldSwitcher( | |
event_code=event_code, | |
obs_host=obs_host, | |
obs_port=obs_port, | |
obs_password=obs_password | |
) | |
# Set logging callback | |
self.switcher.set_log_callback(self.log) | |
# Update field-scene mapping | |
self.switcher.update_field_scene_mapping(self.get_field_scene_mapping()) | |
# Create new event loop | |
self.async_loop = asyncio.new_event_loop() | |
# Start monitoring in a separate thread | |
self.thread = threading.Thread(target=self.run_async_monitoring, daemon=True) | |
self.thread.start() | |
# Update UI state | |
self.start_button.config(state=tk.DISABLED) | |
self.stop_button.config(state=tk.NORMAL) | |
self.status_var.set("Status: Running") | |
self.log("Monitoring started") | |
def run_async_monitoring(self): | |
"""Run the async monitoring in a separate thread with its own event loop""" | |
asyncio.set_event_loop(self.async_loop) | |
self.monitor_task = self.async_loop.create_task(self.switcher.monitor_ftc_websocket()) | |
try: | |
self.async_loop.run_until_complete(self.monitor_task) | |
except asyncio.CancelledError: | |
pass | |
finally: | |
# When the task is done, update UI back on the main thread | |
self.root.after(0, self.update_ui_after_stop) | |
def stop_monitoring(self): | |
"""Stop the monitoring process""" | |
if self.switcher and self.switcher.running: | |
self.log("Stopping monitoring...") | |
# Cancel the monitoring task | |
if self.monitor_task and not self.monitor_task.done(): | |
self.async_loop.call_soon_threadsafe(self.monitor_task.cancel) | |
# Schedule task to gracefully shutdown | |
shutdown_task = asyncio.run_coroutine_threadsafe(self.switcher.shutdown(), self.async_loop) | |
# Wait for shutdown to complete (with timeout) | |
try: | |
shutdown_task.result(timeout=5) | |
except concurrent.futures.TimeoutError: | |
self.log("Shutdown timed out, forcing exit") | |
except Exception as e: | |
self.log(f"Error during shutdown: {e}") | |
# Set switcher as not running | |
self.switcher.running = False | |
def update_ui_after_stop(self): | |
"""Update UI elements after monitoring has stopped""" | |
self.start_button.config(state=tk.NORMAL) | |
self.stop_button.config(state=tk.DISABLED) | |
self.status_var.set("Status: Not Running") | |
self.log("Monitoring stopped") | |
def on_closing(self): | |
"""Handle window close event""" | |
if self.switcher and self.switcher.running: | |
self.stop_monitoring() | |
# Give it a moment to clean up | |
time.sleep(1) | |
self.root.destroy() | |
if __name__ == "__main__": | |
# Fix for high DPI displays | |
try: | |
from ctypes import windll | |
windll.shcore.SetProcessDpiAwareness(1) | |
except: | |
pass | |
# Import for run_coroutine_threadsafe | |
import concurrent.futures | |
root = tk.Tk() | |
app = FTCSwitcherGUI(root) | |
root.protocol("WM_DELETE_WINDOW", app.on_closing) | |
root.mainloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment