Skip to content

Instantly share code, notes, and snippets.

@SonicDMG
Created April 2, 2025 00:19
Show Gist options
  • Save SonicDMG/2ce15db0b2a63e0ef23f35da485f590e to your computer and use it in GitHub Desktop.
Save SonicDMG/2ce15db0b2a63e0ef23f35da485f590e to your computer and use it in GitHub Desktop.
An example relay.js to use with OpenAI's realtime websocket console and Langflow in voice TTS mode
// relay.js - WebSocket relay server for text-to-speech functionality
import { WebSocketServer, WebSocket as NodeWebSocket } from 'ws';
import { v4 as uuidv4 } from 'uuid';
/**
* RealtimeRelay - Manages WebSocket connections between a frontend client and Langflow backend
* Primarily handles text-to-speech (TTS) functionality with message queueing and audio response handling
*/
export class RealtimeRelay {
constructor({
flowId = 'your_flow_id', //cc624f50-c695-4e25-bd83-e4497f5cbd1a
sessionId = 'relay-session',
host = '127.0.0.1',
port = 7860,
path = '/api/v1/voice/ws/flow_tts',
pingInterval = 30000,
maxQueueSize = 100
} = {}) {
// Connection details
this.flowId = flowId;
this.sessionId = sessionId;
this.path = path;
// Construct backend URL with protocol detection
const protocol = host.includes('localhost') || host.includes('127.0.0.1') ? 'ws:' : 'wss:';
this.backendUrl = `${protocol}//${host}:${port}${path}/${this.flowId}/${this.sessionId}`;
console.log('🔌 Connecting to backend at:', this.backendUrl);
// WebSocket server and connection management
this.wss = null;
this.messageQueue = [];
this.pingInterval = pingInterval;
this.maxQueueSize = maxQueueSize;
this.activeConnections = new Set();
// Voice settings with defaults
this.voiceSettings = {
voice: "alloy",
provider: "openai"
};
// Connection state tracking
this.connectionAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000; // Start with 1 second delay
// Add audio state tracking
this.currentAudioTrack = null;
this.activeAudioItems = new Map(); // Map of item_id to track_id
}
/**
* Start the WebSocket server and listen for connections
*/
listen(port) {
this.wss = new WebSocketServer({
port,
perMessageDeflate: true // Enable compression for better performance
});
this.wss.on('connection', this.connectionHandler.bind(this));
this.log(`🟢 Relay listening at ws://localhost:${port}`);
}
/**
* Handle new frontend WebSocket connections
* Sets up message relay between frontend and backend
*/
async connectionHandler(frontendSocket) {
this.log(`🔀 Using path: ${this.path}`);
// Initialize backend connection
const backendSocket = await this.setupBackendConnection(frontendSocket);
// Track connection and setup keep-alive
this.messageQueue = [];
this.activeConnections.add(frontendSocket);
const pingTimer = this.setupKeepAlive(frontendSocket, backendSocket);
// Variable to track the current conversation item
let fallbackItemId = null;
// Set up all event handlers
this.setupFrontendHandlers(frontendSocket, backendSocket, pingTimer);
this.setupBackendHandlers(backendSocket, frontendSocket, pingTimer, fallbackItemId);
}
/**
* Configure and create backend WebSocket connection
*/
async setupBackendConnection(frontendSocket) {
frontendSocket.binaryType = 'arraybuffer';
const connectWithRetry = async (attempt = 0) => {
try {
const socket = new NodeWebSocket(this.backendUrl, {
perMessageDeflate: true,
handshakeTimeout: 5000,
});
// Reset connection attempts on successful connection
socket.on('open', () => {
this.connectionAttempts = 0;
this.reconnectDelay = 1000;
});
return socket;
} catch (error) {
if (attempt < this.maxReconnectAttempts) {
this.log(`⚠️ Connection attempt ${attempt + 1} failed, retrying in ${this.reconnectDelay}ms...`);
await new Promise(resolve => setTimeout(resolve, this.reconnectDelay));
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 10000); // Exponential backoff, max 10 seconds
return connectWithRetry(attempt + 1);
}
throw error;
}
};
return connectWithRetry();
}
/**
* Setup keep-alive pings for both connections
* @returns {NodeJS.Timer} Interval timer for cleanup
*/
setupKeepAlive(frontendSocket, backendSocket) {
return setInterval(() => {
if (frontendSocket.readyState === NodeWebSocket.OPEN) frontendSocket.ping();
if (backendSocket.readyState === NodeWebSocket.OPEN) backendSocket.ping();
}, this.pingInterval);
}
/**
* Set up all frontend WebSocket event handlers
*/
setupFrontendHandlers(frontendSocket, backendSocket, pingTimer) {
// Handle incoming messages from frontend
frontendSocket.on('message', (data) => {
this.handleFrontendMessage(data, backendSocket, frontendSocket);
});
// Handle frontend connection closure
frontendSocket.on('close', () => {
this.log(`❌ Frontend closed, closing backend`);
this.cleanup(pingTimer, frontendSocket, backendSocket);
});
// Handle frontend errors
frontendSocket.on('error', (error) => {
this.log(`⚠️ Frontend socket error:`, error.message);
this.cleanup(pingTimer, frontendSocket, backendSocket);
});
}
/**
* Set up all backend WebSocket event handlers
*/
setupBackendHandlers(backendSocket, frontendSocket, pingTimer, fallbackItemId) {
// Handle successful backend connection
backendSocket.on('open', () => {
this.handleBackendOpen(backendSocket, frontendSocket);
});
// Handle incoming messages from backend
backendSocket.on('message', (data) => {
this.handleBackendMessage(data, frontendSocket, fallbackItemId);
});
// Handle backend connection closure
backendSocket.on('close', () => {
this.log(`❌ Backend closed, closing frontend`);
this.cleanup(pingTimer, frontendSocket, backendSocket);
});
// Handle backend errors
backendSocket.on('error', (error) => {
this.log(`⚠️ Backend socket error:`, error.message);
this.cleanup(pingTimer, frontendSocket, backendSocket);
});
}
/**
* Handle messages from the frontend client
*/
handleFrontendMessage(data, backendSocket, frontendSocket) {
const text = this.normalize(data);
try {
const message = JSON.parse(text);
// Check if this is an interrupt request
if (message.type === 'response.interrupt') {
// For interrupts, we use the last active item ID if none provided
const itemId = message.item_id || Array.from(this.activeAudioItems.keys()).pop();
if (itemId) {
this.handleAudioInterrupt(frontendSocket, backendSocket, itemId);
return;
}
}
// Handle message queue overflow
if (this.messageQueue.length >= this.maxQueueSize) {
this.log('⚠️ Message queue full - dropping oldest message');
this.messageQueue.shift();
}
this.log(`➡️ Frontend → Backend:`, this.truncate(text));
this.sendToBackend(message, backendSocket);
} catch (err) {
// If parsing fails, send original message
this.sendToBackend(text, backendSocket);
}
}
/**
* Send a message to the backend, queue if not connected
*/
sendToBackend(message, backendSocket) {
const messageStr = typeof message === 'string' ? message : JSON.stringify(message);
if (backendSocket.readyState === NodeWebSocket.OPEN) {
backendSocket.send(messageStr, { compress: true });
} else {
this.messageQueue.push(messageStr);
}
}
/**
* Handle successful backend connection
*/
handleBackendOpen(backendSocket, frontendSocket) {
// Configure voice settings
const voiceSettingsMsg = JSON.stringify({
type: "voice.settings",
...this.voiceSettings,
event_id: `relay_evt_${uuidv4()}`,
});
// Send voice settings
backendSocket.send(voiceSettingsMsg, { compress: true });
this.log(`🗣️ Sent voice settings: ${this.voiceSettings.voice} / ${this.voiceSettings.provider}`);
this.log(`✅ Connected to Langflow`);
// Process any queued messages
this.flushMessageQueue(backendSocket);
}
/**
* Process and send any queued messages
*/
flushMessageQueue(backendSocket) {
if (this.messageQueue.length > 0) {
const messages = this.messageQueue.splice(0);
Promise.all(messages.map(msg =>
new Promise(resolve => backendSocket.send(msg, { compress: true }, resolve))
)).then(() => {
this.log(`🔁 Flushed ${messages.length} queued messages`);
});
}
}
/**
* Handle messages from the backend with audio tracking
*/
handleBackendMessage(data, frontendSocket, fallbackItemId) {
const text = this.normalize(data);
try {
const event = JSON.parse(text);
// Ensure all events have an ID
if (!event.event_id) {
event.event_id = `relay_${uuidv4()}`;
}
// Track audio events
if (event.type === 'response.audio.delta') {
// Store the track ID when audio starts
if (event.track_id && event.item_id) {
this.activeAudioItems.set(event.item_id, event.track_id);
this.currentAudioTrack = event.track_id;
}
this.handleAudioEvent(event, frontendSocket, fallbackItemId);
} else if (event.type === 'response.audio.done') {
// Clear the track ID when audio completes
if (event.item_id) {
this.activeAudioItems.delete(event.item_id);
if (this.currentAudioTrack === this.activeAudioItems.get(event.item_id)) {
this.currentAudioTrack = null;
}
}
this.handleAudioEvent(event, frontendSocket, fallbackItemId);
}
// Forward the event to frontend
frontendSocket.send(JSON.stringify(event), { compress: true });
this.log(`⬅️ Backend → Frontend:`, this.truncate(JSON.stringify(event)));
} catch (err) {
this.log(`❌ Failed to parse backend message:`, text);
}
}
/**
* Handle interruption of audio playback
*/
handleAudioInterrupt(frontendSocket, backendSocket, itemId) {
const trackId = this.activeAudioItems.get(itemId);
// Send audio cancel to frontend first to stop playback
const frontendCancelMsg = JSON.stringify({
type: 'response.audio.cancel',
event_id: `relay_evt_${uuidv4()}`,
item_id: itemId,
track_id: trackId || 'current', // Ensure we always send a track_id
status: 'cancelled'
});
if (frontendSocket?.readyState === NodeWebSocket.OPEN) {
frontendSocket.send(frontendCancelMsg, { compress: true });
}
// Then send interrupt message to backend
const backendInterruptMsg = JSON.stringify({
type: 'response.interrupt',
event_id: `relay_evt_${uuidv4()}`,
item_id: itemId,
track_id: trackId || 'current'
});
if (backendSocket?.readyState === NodeWebSocket.OPEN) {
backendSocket.send(backendInterruptMsg, { compress: true });
}
this.log(`🛑 Cancelled audio playback for item: ${itemId}`);
// Clear tracking state
this.activeAudioItems.delete(itemId);
if (this.currentAudioTrack === trackId) {
this.currentAudioTrack = null;
}
// Send completion messages to ensure proper cleanup
this.handleAudioCompletion(frontendSocket, itemId);
}
/**
* Handle audio-related events from backend
*/
handleAudioEvent(event, frontendSocket, fallbackItemId) {
// Create conversation item if needed
if (!event.item_id && !fallbackItemId) {
fallbackItemId = this.createConversationItem(frontendSocket);
event.item_id = fallbackItemId;
}
// Handle audio completion or cancellation
if (event.type === 'response.audio.done' || event.type === 'response.audio.cancel') {
this.handleAudioCompletion(frontendSocket, event.item_id || fallbackItemId);
fallbackItemId = null;
}
}
/**
* Create a new conversation item and send required messages
*/
createConversationItem(frontendSocket) {
const itemId = `relay_item_${uuidv4()}`;
const syntheticMessages = [
{
type: 'conversation.item.created',
event_id: `relay_evt_${uuidv4()}`,
item: {
id: itemId,
object: 'realtime.conversation.item',
type: 'message',
role: 'assistant',
content: [],
status: 'in_progress',
},
previous_item_id: null,
},
{
type: 'response.created',
event_id: `relay_evt_${uuidv4()}`,
response: {
object: 'realtime.response',
id: `relay_resp_${uuidv4()}`,
status: 'in_progress',
},
item_id: itemId,
}
];
Promise.all(syntheticMessages.map(msg =>
new Promise(resolve => frontendSocket.send(JSON.stringify(msg), { compress: true }, resolve))
));
this.log(`✨ Injected synthetic messages with item_id: ${itemId}`);
return itemId;
}
/**
* Handle audio completion and send required messages
*/
handleAudioCompletion(frontendSocket, itemId) {
// Clear audio tracking state
this.activeAudioItems.delete(itemId);
if (this.currentAudioTrack === this.activeAudioItems.get(itemId)) {
this.currentAudioTrack = null;
}
const completionMessages = [
{
type: 'input_audio_buffer.speech_stopped',
event_id: `relay_evt_${uuidv4()}`,
payload: {
audio_end_ms: Date.now(),
audio: {
end_ms: Date.now()
}
}
},
{
type: 'conversation.item.updated',
event_id: `relay_evt_${uuidv4()}`,
item_id: itemId,
updates: {
status: 'completed',
},
}
];
if (frontendSocket?.readyState === NodeWebSocket.OPEN) {
Promise.all(completionMessages.map(msg =>
new Promise(resolve => frontendSocket.send(JSON.stringify(msg), { compress: true }, resolve))
));
this.log(`✅ Completed conversation item: ${itemId}`);
}
}
/**
* Clean up resources when a connection closes
*/
cleanup(pingTimer, frontendSocket, backendSocket) {
try {
// Clear audio state
this.currentAudioTrack = null;
this.activeAudioItems.clear();
clearInterval(pingTimer);
this.activeConnections.delete(frontendSocket);
if (frontendSocket?.readyState === NodeWebSocket.OPEN) {
frontendSocket.close(1000, 'Normal closure');
}
if (backendSocket?.readyState === NodeWebSocket.OPEN) {
backendSocket.close(1000, 'Normal closure');
}
} catch (error) {
this.log(`⚠️ Error during cleanup:`, error.message);
}
}
/**
* Convert incoming message data to string format
*/
normalize(data) {
return typeof data === 'string' ? data : data.toString('utf-8');
}
/**
* Truncate long strings for logging
*/
truncate(str, len = 180) {
return str.length > len ? str.slice(0, len) + '…' : str;
}
/**
* Log messages with consistent prefix
*/
log(...args) {
console.log('[RealtimeRelay]', ...args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment