Created
April 2, 2025 00:19
-
-
Save SonicDMG/2ce15db0b2a63e0ef23f35da485f590e to your computer and use it in GitHub Desktop.
An example relay.js to use with OpenAI's realtime websocket console and Langflow in voice TTS mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// relay.js - WebSocket relay server for text-to-speech functionality | |
import { WebSocketServer, WebSocket as NodeWebSocket } from 'ws'; | |
import { v4 as uuidv4 } from 'uuid'; | |
/** | |
* RealtimeRelay - Manages WebSocket connections between a frontend client and Langflow backend | |
* Primarily handles text-to-speech (TTS) functionality with message queueing and audio response handling | |
*/ | |
export class RealtimeRelay { | |
constructor({ | |
flowId = 'your_flow_id', //cc624f50-c695-4e25-bd83-e4497f5cbd1a | |
sessionId = 'relay-session', | |
host = '127.0.0.1', | |
port = 7860, | |
path = '/api/v1/voice/ws/flow_tts', | |
pingInterval = 30000, | |
maxQueueSize = 100 | |
} = {}) { | |
// Connection details | |
this.flowId = flowId; | |
this.sessionId = sessionId; | |
this.path = path; | |
// Construct backend URL with protocol detection | |
const protocol = host.includes('localhost') || host.includes('127.0.0.1') ? 'ws:' : 'wss:'; | |
this.backendUrl = `${protocol}//${host}:${port}${path}/${this.flowId}/${this.sessionId}`; | |
console.log('🔌 Connecting to backend at:', this.backendUrl); | |
// WebSocket server and connection management | |
this.wss = null; | |
this.messageQueue = []; | |
this.pingInterval = pingInterval; | |
this.maxQueueSize = maxQueueSize; | |
this.activeConnections = new Set(); | |
// Voice settings with defaults | |
this.voiceSettings = { | |
voice: "alloy", | |
provider: "openai" | |
}; | |
// Connection state tracking | |
this.connectionAttempts = 0; | |
this.maxReconnectAttempts = 5; | |
this.reconnectDelay = 1000; // Start with 1 second delay | |
// Add audio state tracking | |
this.currentAudioTrack = null; | |
this.activeAudioItems = new Map(); // Map of item_id to track_id | |
} | |
/** | |
* Start the WebSocket server and listen for connections | |
*/ | |
listen(port) { | |
this.wss = new WebSocketServer({ | |
port, | |
perMessageDeflate: true // Enable compression for better performance | |
}); | |
this.wss.on('connection', this.connectionHandler.bind(this)); | |
this.log(`🟢 Relay listening at ws://localhost:${port}`); | |
} | |
/** | |
* Handle new frontend WebSocket connections | |
* Sets up message relay between frontend and backend | |
*/ | |
async connectionHandler(frontendSocket) { | |
this.log(`🔀 Using path: ${this.path}`); | |
// Initialize backend connection | |
const backendSocket = await this.setupBackendConnection(frontendSocket); | |
// Track connection and setup keep-alive | |
this.messageQueue = []; | |
this.activeConnections.add(frontendSocket); | |
const pingTimer = this.setupKeepAlive(frontendSocket, backendSocket); | |
// Variable to track the current conversation item | |
let fallbackItemId = null; | |
// Set up all event handlers | |
this.setupFrontendHandlers(frontendSocket, backendSocket, pingTimer); | |
this.setupBackendHandlers(backendSocket, frontendSocket, pingTimer, fallbackItemId); | |
} | |
/** | |
* Configure and create backend WebSocket connection | |
*/ | |
async setupBackendConnection(frontendSocket) { | |
frontendSocket.binaryType = 'arraybuffer'; | |
const connectWithRetry = async (attempt = 0) => { | |
try { | |
const socket = new NodeWebSocket(this.backendUrl, { | |
perMessageDeflate: true, | |
handshakeTimeout: 5000, | |
}); | |
// Reset connection attempts on successful connection | |
socket.on('open', () => { | |
this.connectionAttempts = 0; | |
this.reconnectDelay = 1000; | |
}); | |
return socket; | |
} catch (error) { | |
if (attempt < this.maxReconnectAttempts) { | |
this.log(`⚠️ Connection attempt ${attempt + 1} failed, retrying in ${this.reconnectDelay}ms...`); | |
await new Promise(resolve => setTimeout(resolve, this.reconnectDelay)); | |
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000); // Exponential backoff, max 10 seconds | |
return connectWithRetry(attempt + 1); | |
} | |
throw error; | |
} | |
}; | |
return connectWithRetry(); | |
} | |
/** | |
* Setup keep-alive pings for both connections | |
* @returns {NodeJS.Timer} Interval timer for cleanup | |
*/ | |
setupKeepAlive(frontendSocket, backendSocket) { | |
return setInterval(() => { | |
if (frontendSocket.readyState === NodeWebSocket.OPEN) frontendSocket.ping(); | |
if (backendSocket.readyState === NodeWebSocket.OPEN) backendSocket.ping(); | |
}, this.pingInterval); | |
} | |
/** | |
* Set up all frontend WebSocket event handlers | |
*/ | |
setupFrontendHandlers(frontendSocket, backendSocket, pingTimer) { | |
// Handle incoming messages from frontend | |
frontendSocket.on('message', (data) => { | |
this.handleFrontendMessage(data, backendSocket, frontendSocket); | |
}); | |
// Handle frontend connection closure | |
frontendSocket.on('close', () => { | |
this.log(`❌ Frontend closed, closing backend`); | |
this.cleanup(pingTimer, frontendSocket, backendSocket); | |
}); | |
// Handle frontend errors | |
frontendSocket.on('error', (error) => { | |
this.log(`⚠️ Frontend socket error:`, error.message); | |
this.cleanup(pingTimer, frontendSocket, backendSocket); | |
}); | |
} | |
/** | |
* Set up all backend WebSocket event handlers | |
*/ | |
setupBackendHandlers(backendSocket, frontendSocket, pingTimer, fallbackItemId) { | |
// Handle successful backend connection | |
backendSocket.on('open', () => { | |
this.handleBackendOpen(backendSocket, frontendSocket); | |
}); | |
// Handle incoming messages from backend | |
backendSocket.on('message', (data) => { | |
this.handleBackendMessage(data, frontendSocket, fallbackItemId); | |
}); | |
// Handle backend connection closure | |
backendSocket.on('close', () => { | |
this.log(`❌ Backend closed, closing frontend`); | |
this.cleanup(pingTimer, frontendSocket, backendSocket); | |
}); | |
// Handle backend errors | |
backendSocket.on('error', (error) => { | |
this.log(`⚠️ Backend socket error:`, error.message); | |
this.cleanup(pingTimer, frontendSocket, backendSocket); | |
}); | |
} | |
/** | |
* Handle messages from the frontend client | |
*/ | |
handleFrontendMessage(data, backendSocket, frontendSocket) { | |
const text = this.normalize(data); | |
try { | |
const message = JSON.parse(text); | |
// Check if this is an interrupt request | |
if (message.type === 'response.interrupt') { | |
// For interrupts, we use the last active item ID if none provided | |
const itemId = message.item_id || Array.from(this.activeAudioItems.keys()).pop(); | |
if (itemId) { | |
this.handleAudioInterrupt(frontendSocket, backendSocket, itemId); | |
return; | |
} | |
} | |
// Handle message queue overflow | |
if (this.messageQueue.length >= this.maxQueueSize) { | |
this.log('⚠️ Message queue full - dropping oldest message'); | |
this.messageQueue.shift(); | |
} | |
this.log(`➡️ Frontend → Backend:`, this.truncate(text)); | |
this.sendToBackend(message, backendSocket); | |
} catch (err) { | |
// If parsing fails, send original message | |
this.sendToBackend(text, backendSocket); | |
} | |
} | |
/** | |
* Send a message to the backend, queue if not connected | |
*/ | |
sendToBackend(message, backendSocket) { | |
const messageStr = typeof message === 'string' ? message : JSON.stringify(message); | |
if (backendSocket.readyState === NodeWebSocket.OPEN) { | |
backendSocket.send(messageStr, { compress: true }); | |
} else { | |
this.messageQueue.push(messageStr); | |
} | |
} | |
/** | |
* Handle successful backend connection | |
*/ | |
handleBackendOpen(backendSocket, frontendSocket) { | |
// Configure voice settings | |
const voiceSettingsMsg = JSON.stringify({ | |
type: "voice.settings", | |
...this.voiceSettings, | |
event_id: `relay_evt_${uuidv4()}`, | |
}); | |
// Send voice settings | |
backendSocket.send(voiceSettingsMsg, { compress: true }); | |
this.log(`🗣️ Sent voice settings: ${this.voiceSettings.voice} / ${this.voiceSettings.provider}`); | |
this.log(`✅ Connected to Langflow`); | |
// Process any queued messages | |
this.flushMessageQueue(backendSocket); | |
} | |
/** | |
* Process and send any queued messages | |
*/ | |
flushMessageQueue(backendSocket) { | |
if (this.messageQueue.length > 0) { | |
const messages = this.messageQueue.splice(0); | |
Promise.all(messages.map(msg => | |
new Promise(resolve => backendSocket.send(msg, { compress: true }, resolve)) | |
)).then(() => { | |
this.log(`🔁 Flushed ${messages.length} queued messages`); | |
}); | |
} | |
} | |
/** | |
* Handle messages from the backend with audio tracking | |
*/ | |
handleBackendMessage(data, frontendSocket, fallbackItemId) { | |
const text = this.normalize(data); | |
try { | |
const event = JSON.parse(text); | |
// Ensure all events have an ID | |
if (!event.event_id) { | |
event.event_id = `relay_${uuidv4()}`; | |
} | |
// Track audio events | |
if (event.type === 'response.audio.delta') { | |
// Store the track ID when audio starts | |
if (event.track_id && event.item_id) { | |
this.activeAudioItems.set(event.item_id, event.track_id); | |
this.currentAudioTrack = event.track_id; | |
} | |
this.handleAudioEvent(event, frontendSocket, fallbackItemId); | |
} else if (event.type === 'response.audio.done') { | |
// Clear the track ID when audio completes | |
if (event.item_id) { | |
this.activeAudioItems.delete(event.item_id); | |
if (this.currentAudioTrack === this.activeAudioItems.get(event.item_id)) { | |
this.currentAudioTrack = null; | |
} | |
} | |
this.handleAudioEvent(event, frontendSocket, fallbackItemId); | |
} | |
// Forward the event to frontend | |
frontendSocket.send(JSON.stringify(event), { compress: true }); | |
this.log(`⬅️ Backend → Frontend:`, this.truncate(JSON.stringify(event))); | |
} catch (err) { | |
this.log(`❌ Failed to parse backend message:`, text); | |
} | |
} | |
/** | |
* Handle interruption of audio playback | |
*/ | |
handleAudioInterrupt(frontendSocket, backendSocket, itemId) { | |
const trackId = this.activeAudioItems.get(itemId); | |
// Send audio cancel to frontend first to stop playback | |
const frontendCancelMsg = JSON.stringify({ | |
type: 'response.audio.cancel', | |
event_id: `relay_evt_${uuidv4()}`, | |
item_id: itemId, | |
track_id: trackId || 'current', // Ensure we always send a track_id | |
status: 'cancelled' | |
}); | |
if (frontendSocket?.readyState === NodeWebSocket.OPEN) { | |
frontendSocket.send(frontendCancelMsg, { compress: true }); | |
} | |
// Then send interrupt message to backend | |
const backendInterruptMsg = JSON.stringify({ | |
type: 'response.interrupt', | |
event_id: `relay_evt_${uuidv4()}`, | |
item_id: itemId, | |
track_id: trackId || 'current' | |
}); | |
if (backendSocket?.readyState === NodeWebSocket.OPEN) { | |
backendSocket.send(backendInterruptMsg, { compress: true }); | |
} | |
this.log(`🛑 Cancelled audio playback for item: ${itemId}`); | |
// Clear tracking state | |
this.activeAudioItems.delete(itemId); | |
if (this.currentAudioTrack === trackId) { | |
this.currentAudioTrack = null; | |
} | |
// Send completion messages to ensure proper cleanup | |
this.handleAudioCompletion(frontendSocket, itemId); | |
} | |
/** | |
* Handle audio-related events from backend | |
*/ | |
handleAudioEvent(event, frontendSocket, fallbackItemId) { | |
// Create conversation item if needed | |
if (!event.item_id && !fallbackItemId) { | |
fallbackItemId = this.createConversationItem(frontendSocket); | |
event.item_id = fallbackItemId; | |
} | |
// Handle audio completion or cancellation | |
if (event.type === 'response.audio.done' || event.type === 'response.audio.cancel') { | |
this.handleAudioCompletion(frontendSocket, event.item_id || fallbackItemId); | |
fallbackItemId = null; | |
} | |
} | |
/** | |
* Create a new conversation item and send required messages | |
*/ | |
createConversationItem(frontendSocket) { | |
const itemId = `relay_item_${uuidv4()}`; | |
const syntheticMessages = [ | |
{ | |
type: 'conversation.item.created', | |
event_id: `relay_evt_${uuidv4()}`, | |
item: { | |
id: itemId, | |
object: 'realtime.conversation.item', | |
type: 'message', | |
role: 'assistant', | |
content: [], | |
status: 'in_progress', | |
}, | |
previous_item_id: null, | |
}, | |
{ | |
type: 'response.created', | |
event_id: `relay_evt_${uuidv4()}`, | |
response: { | |
object: 'realtime.response', | |
id: `relay_resp_${uuidv4()}`, | |
status: 'in_progress', | |
}, | |
item_id: itemId, | |
} | |
]; | |
Promise.all(syntheticMessages.map(msg => | |
new Promise(resolve => frontendSocket.send(JSON.stringify(msg), { compress: true }, resolve)) | |
)); | |
this.log(`✨ Injected synthetic messages with item_id: ${itemId}`); | |
return itemId; | |
} | |
/** | |
* Handle audio completion and send required messages | |
*/ | |
handleAudioCompletion(frontendSocket, itemId) { | |
// Clear audio tracking state | |
this.activeAudioItems.delete(itemId); | |
if (this.currentAudioTrack === this.activeAudioItems.get(itemId)) { | |
this.currentAudioTrack = null; | |
} | |
const completionMessages = [ | |
{ | |
type: 'input_audio_buffer.speech_stopped', | |
event_id: `relay_evt_${uuidv4()}`, | |
payload: { | |
audio_end_ms: Date.now(), | |
audio: { | |
end_ms: Date.now() | |
} | |
} | |
}, | |
{ | |
type: 'conversation.item.updated', | |
event_id: `relay_evt_${uuidv4()}`, | |
item_id: itemId, | |
updates: { | |
status: 'completed', | |
}, | |
} | |
]; | |
if (frontendSocket?.readyState === NodeWebSocket.OPEN) { | |
Promise.all(completionMessages.map(msg => | |
new Promise(resolve => frontendSocket.send(JSON.stringify(msg), { compress: true }, resolve)) | |
)); | |
this.log(`✅ Completed conversation item: ${itemId}`); | |
} | |
} | |
/** | |
* Clean up resources when a connection closes | |
*/ | |
cleanup(pingTimer, frontendSocket, backendSocket) { | |
try { | |
// Clear audio state | |
this.currentAudioTrack = null; | |
this.activeAudioItems.clear(); | |
clearInterval(pingTimer); | |
this.activeConnections.delete(frontendSocket); | |
if (frontendSocket?.readyState === NodeWebSocket.OPEN) { | |
frontendSocket.close(1000, 'Normal closure'); | |
} | |
if (backendSocket?.readyState === NodeWebSocket.OPEN) { | |
backendSocket.close(1000, 'Normal closure'); | |
} | |
} catch (error) { | |
this.log(`⚠️ Error during cleanup:`, error.message); | |
} | |
} | |
/** | |
* Convert incoming message data to string format | |
*/ | |
normalize(data) { | |
return typeof data === 'string' ? data : data.toString('utf-8'); | |
} | |
/** | |
* Truncate long strings for logging | |
*/ | |
truncate(str, len = 180) { | |
return str.length > len ? str.slice(0, len) + '…' : str; | |
} | |
/** | |
* Log messages with consistent prefix | |
*/ | |
log(...args) { | |
console.log('[RealtimeRelay]', ...args); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment