Last active
November 15, 2025 18:25
-
-
Save AlexsanderHamir/73b83ada21d9b84d4fe09665cf1745f5 to your computer and use it in GitHub Desktop.
LiteLLM Locust File (`/realtime`)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import time | |
| import uuid | |
| from locust import HttpUser, between, events, task | |
| from websocket import ( | |
| WebSocketBadStatusException, | |
| WebSocketConnectionClosedException, | |
| create_connection, | |
| ) | |
| class MyUser(HttpUser): | |
| wait_time = between(0.5, 1) # Random wait time between requests | |
| def on_start(self): | |
| # We intentionally hold on to a single WebSocket connection per Locust user | |
| # instead of letting the server/session lifecycle manage it. The realtime | |
| # endpoint we are stress testing keeps session-level caches (e.g. model | |
| # state, audio buffers, tool execution context) that are keyed to the | |
| # websocket connection ID. If we constantly reconnect we would measure | |
| # session creation overhead instead of the steady-state response latency | |
| # and we would never exercise the cache hit path we want to observe. | |
| # Managing the lifecycle here ensures we only reconnect when the socket | |
| # actually drops and we can detect whether the server-side cache behaves | |
| # as expected for long-lived sessions. | |
| self.api_key = "sk-1234" | |
| base_realtime_url = "ws://localhost:4000/v1/realtime" | |
| self.realtime_model = "fake-openai-realtime-text" | |
| connector = "&" if "?" in base_realtime_url else "?" | |
| self.realtime_url = f"{base_realtime_url}{connector}model={self.realtime_model}" | |
| self.ws = None | |
| self.session_ready = False | |
| def on_stop(self): | |
| self._close_ws() | |
| def _close_ws(self): | |
| ws = getattr(self, "ws", None) | |
| if ws is None: | |
| return | |
| try: | |
| ws.close() | |
| except Exception: | |
| pass | |
| finally: | |
| self.ws = None | |
| self.session_ready = False | |
| def _ensure_connection(self): | |
| ws = getattr(self, "ws", None) | |
| if ws is not None and ws.connected: | |
| return ws | |
| # When the connection is gone we proactively tear down whatever state is | |
| # left and then build a fresh websocket. This gives us deterministic | |
| # control over when session.created fires and lets us surface reconnect | |
| # latency in Locust metrics rather than hiding it on the server side. | |
| self._close_ws() | |
| ws = create_connection( | |
| self.realtime_url, | |
| header=[f"Authorization: Bearer {self.api_key}"], | |
| ) | |
| self.ws = ws | |
| self.session_ready = False | |
| # wait for session.created once per connection | |
| self._await_session_created(ws) | |
| self.session_ready = True | |
| return ws | |
| def _await_session_created(self, ws): | |
| # Called only right after establishing a fresh websocket connection | |
| while not self.session_ready: | |
| message = ws.recv() | |
| if not message or not message.strip(): | |
| continue | |
| try: | |
| data = json.loads(message) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Invalid JSON received: {message[:100]}") from e | |
| if data.get("type") == "session.created": | |
| return | |
| @task | |
| def litellm_fake_realtime(self): | |
| story_prompt = ( | |
| "Tell me a short, two sentence story about the following random id: " | |
| f"{uuid.uuid4()}." | |
| ) | |
| response_payload = { | |
| "type": "response.create", | |
| "response": { | |
| "instructions": story_prompt, | |
| "metadata": {"test_case": "fake-realtime"}, | |
| }, | |
| } | |
| start_time = time.time() | |
| accumulated_text: list[str] = [] | |
| response_completed = False | |
| payload = json.dumps(response_payload) | |
| try: | |
| ws = self._ensure_connection() | |
| try: | |
| ws.send(payload) | |
| except (WebSocketConnectionClosedException, BrokenPipeError): | |
| self._close_ws() | |
| ws = self._ensure_connection() | |
| ws.send(payload) | |
| # Wait for response events | |
| while True: | |
| message = ws.recv() | |
| if not message or not message.strip(): | |
| continue | |
| try: | |
| data = json.loads(message) | |
| except json.JSONDecodeError as e: | |
| raise ValueError(f"Invalid JSON received: {message[:100]}") from e | |
| event_type = data.get("type") | |
| if event_type == "response.text.delta": | |
| accumulated_text.append(data.get("delta", "")) | |
| elif event_type == "response.text.done": | |
| # Wait for the final response.done event to ensure we drain the stream | |
| continue | |
| elif event_type == "response.done": | |
| # Final event - response is complete | |
| response_completed = True | |
| break | |
| elif event_type == "response.error": | |
| raise RuntimeError(f"Realtime error event: {data}") | |
| except WebSocketConnectionClosedException as exc: | |
| elapsed = (time.time() - start_time) * 1000 | |
| generated_text = "".join(accumulated_text).strip() | |
| self._close_ws() | |
| if response_completed or generated_text: | |
| events.request.fire( | |
| request_type="WebSocket", | |
| name="fake-openai-realtime-text", | |
| response_time=elapsed, | |
| response_length=len(generated_text), | |
| ) | |
| else: | |
| events.request.fire( | |
| request_type="WebSocket", | |
| name="fake-openai-realtime-text", | |
| response_time=elapsed, | |
| response_length=0, | |
| exception=exc, | |
| ) | |
| except ( | |
| WebSocketBadStatusException, | |
| RuntimeError, | |
| OSError, | |
| ValueError, | |
| json.JSONDecodeError, | |
| ) as exc: | |
| elapsed = (time.time() - start_time) * 1000 | |
| self._close_ws() | |
| events.request.fire( | |
| request_type="WebSocket", | |
| name="fake-openai-realtime-text", | |
| response_time=elapsed, | |
| response_length=0, | |
| exception=exc, | |
| ) | |
| except Exception: | |
| self._close_ws() | |
| raise | |
| else: | |
| elapsed = (time.time() - start_time) * 1000 | |
| generated_text = "".join(accumulated_text).strip() | |
| # If no text was generated, treat it as an error | |
| if not generated_text: | |
| events.request.fire( | |
| request_type="WebSocket", | |
| name="fake-openai-realtime-text", | |
| response_time=elapsed, | |
| response_length=0, | |
| exception=ValueError("No text was generated - received response.done or response.text.done but no text deltas"), | |
| ) | |
| else: | |
| events.request.fire( | |
| request_type="WebSocket", | |
| name="fake-openai-realtime-text", | |
| response_time=elapsed, | |
| response_length=len(generated_text), | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment