Skip to content

Instantly share code, notes, and snippets.

@AlexsanderHamir
Last active November 15, 2025 18:25
Show Gist options
  • Select an option

  • Save AlexsanderHamir/73b83ada21d9b84d4fe09665cf1745f5 to your computer and use it in GitHub Desktop.

Select an option

Save AlexsanderHamir/73b83ada21d9b84d4fe09665cf1745f5 to your computer and use it in GitHub Desktop.
LiteLLM Locust File (`/realtime`)
import json
import time
import uuid
from locust import HttpUser, between, events, task
from websocket import (
WebSocketBadStatusException,
WebSocketConnectionClosedException,
create_connection,
)
class MyUser(HttpUser):
wait_time = between(0.5, 1) # Random wait time between requests
def on_start(self):
# We intentionally hold on to a single WebSocket connection per Locust user
# instead of letting the server/session lifecycle manage it. The realtime
# endpoint we are stress testing keeps session-level caches (e.g. model
# state, audio buffers, tool execution context) that are keyed to the
# websocket connection ID. If we constantly reconnect we would measure
# session creation overhead instead of the steady-state response latency
# and we would never exercise the cache hit path we want to observe.
# Managing the lifecycle here ensures we only reconnect when the socket
# actually drops and we can detect whether the server-side cache behaves
# as expected for long-lived sessions.
self.api_key = "sk-1234"
base_realtime_url = "ws://localhost:4000/v1/realtime"
self.realtime_model = "fake-openai-realtime-text"
connector = "&" if "?" in base_realtime_url else "?"
self.realtime_url = f"{base_realtime_url}{connector}model={self.realtime_model}"
self.ws = None
self.session_ready = False
def on_stop(self):
self._close_ws()
def _close_ws(self):
ws = getattr(self, "ws", None)
if ws is None:
return
try:
ws.close()
except Exception:
pass
finally:
self.ws = None
self.session_ready = False
def _ensure_connection(self):
ws = getattr(self, "ws", None)
if ws is not None and ws.connected:
return ws
# When the connection is gone we proactively tear down whatever state is
# left and then build a fresh websocket. This gives us deterministic
# control over when session.created fires and lets us surface reconnect
# latency in Locust metrics rather than hiding it on the server side.
self._close_ws()
ws = create_connection(
self.realtime_url,
header=[f"Authorization: Bearer {self.api_key}"],
)
self.ws = ws
self.session_ready = False
# wait for session.created once per connection
self._await_session_created(ws)
self.session_ready = True
return ws
def _await_session_created(self, ws):
# Called only right after establishing a fresh websocket connection
while not self.session_ready:
message = ws.recv()
if not message or not message.strip():
continue
try:
data = json.loads(message)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON received: {message[:100]}") from e
if data.get("type") == "session.created":
return
@task
def litellm_fake_realtime(self):
story_prompt = (
"Tell me a short, two sentence story about the following random id: "
f"{uuid.uuid4()}."
)
response_payload = {
"type": "response.create",
"response": {
"instructions": story_prompt,
"metadata": {"test_case": "fake-realtime"},
},
}
start_time = time.time()
accumulated_text: list[str] = []
response_completed = False
payload = json.dumps(response_payload)
try:
ws = self._ensure_connection()
try:
ws.send(payload)
except (WebSocketConnectionClosedException, BrokenPipeError):
self._close_ws()
ws = self._ensure_connection()
ws.send(payload)
# Wait for response events
while True:
message = ws.recv()
if not message or not message.strip():
continue
try:
data = json.loads(message)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON received: {message[:100]}") from e
event_type = data.get("type")
if event_type == "response.text.delta":
accumulated_text.append(data.get("delta", ""))
elif event_type == "response.text.done":
# Wait for the final response.done event to ensure we drain the stream
continue
elif event_type == "response.done":
# Final event - response is complete
response_completed = True
break
elif event_type == "response.error":
raise RuntimeError(f"Realtime error event: {data}")
except WebSocketConnectionClosedException as exc:
elapsed = (time.time() - start_time) * 1000
generated_text = "".join(accumulated_text).strip()
self._close_ws()
if response_completed or generated_text:
events.request.fire(
request_type="WebSocket",
name="fake-openai-realtime-text",
response_time=elapsed,
response_length=len(generated_text),
)
else:
events.request.fire(
request_type="WebSocket",
name="fake-openai-realtime-text",
response_time=elapsed,
response_length=0,
exception=exc,
)
except (
WebSocketBadStatusException,
RuntimeError,
OSError,
ValueError,
json.JSONDecodeError,
) as exc:
elapsed = (time.time() - start_time) * 1000
self._close_ws()
events.request.fire(
request_type="WebSocket",
name="fake-openai-realtime-text",
response_time=elapsed,
response_length=0,
exception=exc,
)
except Exception:
self._close_ws()
raise
else:
elapsed = (time.time() - start_time) * 1000
generated_text = "".join(accumulated_text).strip()
# If no text was generated, treat it as an error
if not generated_text:
events.request.fire(
request_type="WebSocket",
name="fake-openai-realtime-text",
response_time=elapsed,
response_length=0,
exception=ValueError("No text was generated - received response.done or response.text.done but no text deltas"),
)
else:
events.request.fire(
request_type="WebSocket",
name="fake-openai-realtime-text",
response_time=elapsed,
response_length=len(generated_text),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment