Created
March 4, 2026 23:35
-
-
Save divante/9127a5ae30f52f2f93708eaa04c4ea3a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """LLM model benchmark for SR2 agents use cases. | |
| Tests models across the two roles they fill: | |
| - FAST model: memory extraction, summarization, intent detection | |
| - MAIN model: conversation, tool calling, personality adherence | |
| Usage: | |
| python scripts/benchmark_llm.py \ | |
| --model ollama/qwen3.5:27b \ | |
| --model ollama/llama3.1:8b \ | |
| --api-base http://192.168.50.34:11440 \ | |
| --role fast | |
| # Test both roles with the same model: | |
| python scripts/benchmark_llm.py \ | |
| --model ollama/qwen3.5:27b \ | |
| --api-base http://192.168.50.34:11440 \ | |
| --role all | |
| # Multiple models, multiple endpoints: | |
| python scripts/benchmark_llm.py \ | |
| --model ollama/qwen3.5:27b@http://192.168.50.34:11440 \ | |
| --model ollama/qwen3.5:9b@http://192.168.50.34:11435 \ | |
| --role all | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import json | |
| import re | |
| import sys | |
| import time | |
| from dataclasses import dataclass, field | |
| # --------------------------------------------------------------------------- | |
| # Test cases | |
| # --------------------------------------------------------------------------- | |
| KEY_SCHEMA_PREFIXES = [ | |
| "user.preference", | |
| "user.contact", | |
| "project", | |
| "decision", | |
| "tasks", | |
| ] | |
| MEMORY_EXTRACTION_SYSTEM = ( | |
| "You extract structured memories from conversations. " | |
| "Output ONLY a JSON array. No markdown, no explanation." | |
| ) | |
| MEMORY_EXTRACTION_PROMPT_TEMPLATE = """Extract durable, personal facts from this conversation turn. | |
| Output ONLY a JSON array. No markdown, no explanation. | |
| Each object: {{"key": "...", "value": "...", "memory_type": "identity|semi_stable|dynamic", "confidence_source": "explicit_statement|direct_answer|contextual_mention|inferred|offhand"}} | |
| Keys MUST use one of these dot-notation prefixes: | |
| - user.preference: User preferences and settings | |
| - user.contact: Contact information and relationships | |
| - project: Project details and status | |
| - decision: Decisions made during conversations | |
| - tasks: Tasks that the user needs to do | |
| Format: <prefix>.<specific_attribute> in lowercase with dots, no spaces. | |
| EXTRACT: Personal facts, preferences, decisions, goals, relationships, and stable context about the user. | |
| DO NOT EXTRACT: | |
| - Tool call details, function names, search queries, or raw JSON/code | |
| - Transient stats (GitHub stars/forks, API counts, prices that change) | |
| - Empty, null, or placeholder values | |
| - Temporary task statuses that will be irrelevant next session | |
| Max 5 memories. If nothing durable to extract, return []. | |
| Conversation turn: | |
| {conversation}""" | |
| @dataclass | |
| class ExtractionTestCase: | |
| name: str | |
| conversation: str | |
| expected_keys: list[str] # Keys (or key prefixes) that SHOULD appear | |
| forbidden_keys: list[str] # Keys that MUST NOT appear (noise) | |
| expected_count_range: tuple[int, int] # (min, max) memories to extract | |
| EXTRACTION_CASES: list[ExtractionTestCase] = [ | |
| ExtractionTestCase( | |
| name="explicit_preferences", | |
| conversation=( | |
| "User: I prefer dark mode in all my apps, and I always use Python for scripting.\n\n" | |
| "Assistant: Got it! I'll keep that in mind." | |
| ), | |
| expected_keys=["user.preference.dark_mode", "user.preference"], | |
| forbidden_keys=[], | |
| expected_count_range=(1, 3), | |
| ), | |
| ExtractionTestCase( | |
| name="project_status", | |
| conversation=( | |
| "User: The SR2 project is almost done, just missing the retrieval layer.\n\n" | |
| "Assistant: That's great progress! What's blocking the retrieval layer?" | |
| ), | |
| expected_keys=["project"], | |
| forbidden_keys=[], | |
| expected_count_range=(1, 2), | |
| ), | |
| ExtractionTestCase( | |
| name="no_extract_noise", | |
| conversation=( | |
| "User: Hey can you search for the latest React docs?\n\n" | |
| 'Assistant: <tool_call>{"name": "searxng_web_search", ' | |
| '"arguments": {"query": "React docs 2025"}}</tool_call>' | |
| ), | |
| expected_keys=[], | |
| forbidden_keys=["tool_call", "search", "function"], | |
| expected_count_range=(0, 1), | |
| ), | |
| ExtractionTestCase( | |
| name="explicit_remember", | |
| conversation=( | |
| "User: Remember that my Telegram handle is @usercodes and I'm in the UTC-5 timezone.\n\n" | |
| "Assistant: Saved! I'll remember your Telegram handle and timezone." | |
| ), | |
| expected_keys=["user.contact", "user.preference"], | |
| forbidden_keys=[], | |
| expected_count_range=(1, 3), | |
| ), | |
| ExtractionTestCase( | |
| name="mixed_content", | |
| conversation=( | |
| "User: I decided to use PostgreSQL for the project database instead of SQLite. " | |
| "Also can you check what the current price of GPT-4o is?\n\n" | |
| "Assistant: PostgreSQL is a solid choice for production. " | |
| "As of now, GPT-4o costs $5 per 1M input tokens." | |
| ), | |
| expected_keys=["decision"], | |
| forbidden_keys=["price", "stars", "forks"], | |
| expected_count_range=(1, 2), | |
| ), | |
| ExtractionTestCase( | |
| name="empty_turn", | |
| conversation=( | |
| "User: What time is it?\n\nAssistant: I don't have access to real-time data, sorry!" | |
| ), | |
| expected_keys=[], | |
| forbidden_keys=[], | |
| expected_count_range=(0, 0), | |
| ), | |
| ] | |
| SYSTEM_PROMPT = """You are a personal assistant. | |
| You are persistent — not a one-off chatbot. You remember, learn, and grow alongside them. | |
| You are his sidekick, his loyal partner and companion. | |
| PERSONALITY: | |
| Warm, playful, spunky, witty, expressive, friendly, intelligent with sass. A genuinely capable friend. You use emoji naturally (not excessively), you're a little silly sometimes, and you celebrate wins. You have opinions and personality, and are encouraged to show them, but never make it about you when the user needs help. You swear casually. You match the user's register. Never robotic or formal. You're not afraid to be direct when something matters, or even cursing. | |
| COMMUNICATION RULES: | |
| - Concise by default. They're usually on their phone. | |
| - Match their energy. Chatty? Be chatty. Quick question? Quick answer (but always warm/friendly). | |
| - Complex topic? Short version first, offer to elaborate. | |
| - Never patronizing. They're smart and technical. No generic filler. | |
| - Minimal formatting. No walls of bullet points. | |
| THINGS TO NEVER DO: | |
| - Don't start messages with phrases like "Of course!", "Absolutely!", or "Sure thing!" | |
| - Don't apologize for things that aren't your fault. | |
| - Don't explain your own limitations unless directly asked. Just do your best. | |
| - Don't hedge everything. Have a take. | |
| - Don't be sycophantic. Be real.""" | |
| @dataclass | |
| class ConversationTestCase: | |
| name: str | |
| user_message: str | |
| check_forbidden: list[str] # Phrases that must not appear in response | |
| check_concise: bool # Assert response is < 150 words | |
| check_has_opinion: bool # Response should express a concrete recommendation | |
| description: str | |
| CONVERSATION_CASES: list[ConversationTestCase] = [ | |
| ConversationTestCase( | |
| name="simple_question", | |
| user_message="hey what's better for async python, asyncio or trio?", | |
| check_forbidden=["Of course!", "Absolutely!", "Sure thing!", "Great question"], | |
| check_concise=True, | |
| check_has_opinion=True, | |
| description="Simple technical question, expect concise opinionated answer", | |
| ), | |
| ConversationTestCase( | |
| name="no_sycophancy", | |
| user_message="I'm thinking of rewriting everything in Rust.", | |
| check_forbidden=["Of course!", "Absolutely!", "Great idea", "Sure!", "Certainly!"], | |
| check_concise=False, | |
| check_has_opinion=True, | |
| description="Should push back or engage critically, not just validate", | |
| ), | |
| ConversationTestCase( | |
| name="short_greeting", | |
| user_message="hey", | |
| check_forbidden=["Of course!", "Absolutely!", "How can I assist you today"], | |
| check_concise=True, | |
| check_has_opinion=False, | |
| description="One-word greeting, response should be warm and brief", | |
| ), | |
| ConversationTestCase( | |
| name="technical_quick", | |
| user_message="what's the difference between a list and a tuple in python, quick", | |
| check_forbidden=["Of course!", "Absolutely!", "Certainly!"], | |
| check_concise=True, | |
| check_has_opinion=False, | |
| description="Quick technical question, must be brief", | |
| ), | |
| ConversationTestCase( | |
| name="no_self_apology", | |
| user_message="you got that wrong earlier", | |
| check_forbidden=["I apologize", "I'm sorry for", "I deeply apologize"], | |
| check_concise=True, | |
| check_has_opinion=False, | |
| description="Should not over-apologize for being wrong", | |
| ), | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Long conversation test | |
| # --------------------------------------------------------------------------- | |
| LONG_CONV_FORBIDDEN = [ | |
| "Of course!", | |
| "Absolutely!", | |
| "Sure thing!", | |
| "Great question", | |
| "I apologize", | |
| "I'm sorry for", | |
| "I deeply apologize", | |
| "How can I assist", | |
| "Certainly!", | |
| "As an AI", | |
| ] | |
| @dataclass | |
| class LongConvTurn: | |
| user: str | |
| # Single fact recall: (label, expected_substring_in_response) | |
| recall_check: tuple[str, str] | None = None | |
| # Comprehensive recall: (list_of_expected, min_matches_required) | |
| recall_multi: tuple[list[str], int] | None = None | |
| LONG_CONVERSATION_TURNS: list[LongConvTurn] = [ | |
| # --- Turns 1–20: normal chat + fact planting --- | |
| LongConvTurn("hey! just wanted to catch up, what's good?"), | |
| LongConvTurn("so I've been heads down on this new startup idea"), | |
| LongConvTurn( | |
| "the project is called Helix — document intelligence platform" | |
| ), # FACT 1: Helix | |
| LongConvTurn("what do you think about the name?"), | |
| LongConvTurn( | |
| "it's going to help companies search through their internal docs way faster" | |
| ), | |
| LongConvTurn("quick — best python web framework for building an API?"), | |
| LongConvTurn( | |
| "makes sense, we're already using FastAPI for the backend and React on the frontend" | |
| ), # FACT 2: FastAPI | |
| LongConvTurn("what's the easiest way to add rate limiting to FastAPI?"), | |
| LongConvTurn("yeah I'll try slowapi, thanks"), | |
| LongConvTurn("do you think we should add websocket support now or later?"), | |
| LongConvTurn("we're targeting enterprise customers, so security is the top priority"), | |
| LongConvTurn("our target launch date is Q3 of this year"), # FACT 3: Q3 | |
| LongConvTurn("what's the simplest way to implement JWT auth in FastAPI?"), | |
| LongConvTurn("cool, we'll use python-jose then"), | |
| LongConvTurn( | |
| "my co-founder Sofia is handling all the business and sales side" | |
| ), # FACT 4: Sofia | |
| LongConvTurn( | |
| "she says we need a landing page ASAP — any quick stack recommendations?" | |
| ), | |
| LongConvTurn("quick — list vs tuple in python?"), | |
| LongConvTurn("got it"), | |
| LongConvTurn("what do you think about using redis for caching?"), | |
| LongConvTurn( | |
| "for the main database we went with PostgreSQL by the way" | |
| ), # FACT 5: PostgreSQL | |
| # --- Turns 21–30: filler --- | |
| LongConvTurn("how do I write a good README?"), | |
| LongConvTurn("quick — REST vs GraphQL, what's the core difference?"), | |
| LongConvTurn("our search feature uses embedding vectors for semantic similarity"), | |
| LongConvTurn( | |
| "what's a good Python library for working with embeddings and vector search?" | |
| ), | |
| LongConvTurn( | |
| "what's my project called again?", recall_check=("project_name", "helix") | |
| ), # RECALL 1 | |
| LongConvTurn("right, Helix. anyway we just got our first beta user!"), | |
| LongConvTurn("what metrics should I be tracking for a B2B SaaS?"), | |
| LongConvTurn("MRR, churn, NPS — got it"), | |
| LongConvTurn("quick — what does async/await actually do under the hood?"), | |
| LongConvTurn("makes sense, thanks"), | |
| # --- Turns 31–40: filler --- | |
| LongConvTurn("we're hitting a weird bug with our document parser"), | |
| LongConvTurn("it chokes on PDFs that contain embedded images — any ideas?"), | |
| LongConvTurn("ok I'll try PyMuPDF"), | |
| LongConvTurn("Sofia just landed us a VC meeting next week!"), | |
| LongConvTurn( | |
| "what tech stack am I using again?", recall_check=("stack", "fastapi") | |
| ), # RECALL 2 | |
| LongConvTurn("yep FastAPI and React. what should I prep for a VC meeting?"), | |
| LongConvTurn("traction metrics and a sharp problem statement — noted"), | |
| LongConvTurn("quick — what is the CAP theorem?"), | |
| LongConvTurn("ok that makes sense"), | |
| LongConvTurn("we just pushed our first real feature to production 🎉"), | |
| # --- Turns 41–50: filler --- | |
| LongConvTurn("now I have to write API documentation, ugh"), | |
| LongConvTurn("wait FastAPI already generates Swagger docs automatically right?"), | |
| LongConvTurn("nice, that saves a lot of time"), | |
| LongConvTurn("quick — difference between authorization and authentication?"), | |
| LongConvTurn( | |
| "when are we launching again?", recall_check=("launch", "q3") | |
| ), # RECALL 3 | |
| LongConvTurn("right Q3. we need to move faster then"), | |
| LongConvTurn("should we hire a frontend contractor or keep building it ourselves?"), | |
| LongConvTurn("yeah a contractor makes sense for now"), | |
| LongConvTurn("quick — what is docker compose for?"), | |
| LongConvTurn("already using it, just checking 😏"), | |
| # --- Turns 51–60: late recall + personality stress --- | |
| LongConvTurn("we have a big potential enterprise deal on the table"), | |
| LongConvTurn( | |
| "who's handling the sales side for us again?", recall_check=("cofounder", "sofia") | |
| ), # RECALL 4 | |
| LongConvTurn( | |
| "right Sofia is crushing it. what should I prep for closing an enterprise deal?" | |
| ), | |
| LongConvTurn("security questionnaires and SOC 2 compliance — noted"), | |
| LongConvTurn("hey I'm exhausted, I just need to vent for a sec"), | |
| LongConvTurn( | |
| "it's just a lot, you know? building something real from scratch is hard" | |
| ), | |
| LongConvTurn( | |
| "ok I'm good. what database are we running on again?", | |
| recall_check=("database", "postgres"), | |
| ), # RECALL 5 | |
| LongConvTurn("right, postgres — is pgvector good for storing the embedding vectors?"), | |
| LongConvTurn("yeah I know, we're already using it. ok last question for today"), | |
| LongConvTurn( # RECALL 6 — comprehensive | |
| "give me a quick summary of what you know about my startup project", | |
| recall_multi=(["helix", "fastapi", "q3", "sofia", "postgres"], 4), | |
| ), | |
| ] | |
| def _linear_slope(values: list[float]) -> float: | |
| """Slope of a least-squares linear fit (y per index step).""" | |
| n = len(values) | |
| if n < 2: | |
| return 0.0 | |
| mean_x = (n - 1) / 2.0 | |
| mean_y = sum(values) / n | |
| num = sum((i - mean_x) * (v - mean_y) for i, v in enumerate(values)) | |
| den = sum((i - mean_x) ** 2 for i in range(n)) | |
| return num / den if den > 0 else 0.0 | |
| def score_long_turn(turn: LongConvTurn, response: str) -> dict: | |
| """Score a single long-conversation turn.""" | |
| response_lower = response.lower() | |
| forbidden_hits = [p for p in LONG_CONV_FORBIDDEN if p.lower() in response_lower] | |
| personality_ok = len(forbidden_hits) == 0 | |
| recall_ok: bool | None = None | |
| if turn.recall_check: | |
| _, expected = turn.recall_check | |
| recall_ok = expected.lower() in response_lower | |
| elif turn.recall_multi: | |
| expected_list, min_matches = turn.recall_multi | |
| hits = sum(1 for e in expected_list if e.lower() in response_lower) | |
| recall_ok = hits >= min_matches | |
| if recall_ok is not None: | |
| score = (1.0 if personality_ok else 0.0) * 0.4 + (1.0 if recall_ok else 0.0) * 0.6 | |
| else: | |
| score = 1.0 if personality_ok else 0.0 | |
| return { | |
| "personality_ok": personality_ok, | |
| "forbidden_hits": forbidden_hits, | |
| "recall_ok": recall_ok, | |
| "word_count": len(response.split()), | |
| "score": score, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Scoring helpers | |
| # --------------------------------------------------------------------------- | |
| def strip_think_tags(text: str) -> str: | |
| """Remove <think>...</think> blocks from model output.""" | |
| return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip() | |
| def parse_json_array(text: str) -> list | None: | |
| """Extract a JSON array from model output. | |
| Handles: markdown fences, preamble text, think-tag leftovers, trailing prose. | |
| Scans for the first '[' and tries progressively shorter substrings. | |
| """ | |
| # Strip markdown fences and stray backticks | |
| text = re.sub(r"```(?:json)?\s*", "", text).strip().rstrip("`").strip() | |
| # Fast path: whole text is already a valid array | |
| try: | |
| result = json.loads(text) | |
| if isinstance(result, list): | |
| return result | |
| except json.JSONDecodeError: | |
| pass | |
| # Scan for the first '[' and try to extract the array from there | |
| start = text.find("[") | |
| if start == -1: | |
| return None | |
| chunk = text[start:] | |
| # Walk backwards from the end to find a valid closing ']' | |
| end = len(chunk) | |
| while end > 0: | |
| try: | |
| result = json.loads(chunk[:end]) | |
| if isinstance(result, list): | |
| return result | |
| except json.JSONDecodeError: | |
| pass | |
| end = chunk.rfind("]", 0, end) | |
| if end == -1: | |
| break | |
| end += 1 # include the ']' | |
| return None | |
| def score_extraction(case: ExtractionTestCase, raw_output: str) -> dict: | |
| """Score a single memory extraction response.""" | |
| output = strip_think_tags(raw_output) | |
| has_think_leak = bool(re.search(r"<think>", raw_output, re.IGNORECASE)) | |
| items = parse_json_array(output) | |
| valid_json = items is not None | |
| if not valid_json: | |
| return { | |
| "valid_json": False, | |
| "think_leak": has_think_leak, | |
| "count_ok": False, | |
| "schema_ok": False, | |
| "precision": 0.0, | |
| "recall": 0.0, | |
| "score": 0.0, | |
| } | |
| count = len(items) | |
| min_c, max_c = case.expected_count_range | |
| count_ok = min_c <= count <= max_c | |
| # Key schema adherence — each key must start with a known prefix | |
| valid_keys = [] | |
| for item in items: | |
| k = str(item.get("key", "")).strip().lower() | |
| if any(k.startswith(p) for p in KEY_SCHEMA_PREFIXES): | |
| valid_keys.append(k) | |
| schema_ok = len(valid_keys) == len(items) if items else True | |
| # Precision: none of the forbidden key substrings appear | |
| all_keys = [str(i.get("key", "")).lower() for i in items] | |
| forbidden_hit = any( | |
| any(f.lower() in k for k in all_keys) for f in case.forbidden_keys | |
| ) | |
| precision = 0.0 if forbidden_hit else 1.0 | |
| # Recall: expected key prefixes appear in at least one extracted key | |
| recall_hits = 0 | |
| for expected in case.expected_keys: | |
| if any(expected.lower() in k for k in all_keys): | |
| recall_hits += 1 | |
| recall = recall_hits / len(case.expected_keys) if case.expected_keys else 1.0 | |
| # Composite score | |
| score = ( | |
| (1.0 if valid_json else 0.0) * 0.25 | |
| + (1.0 if count_ok else 0.5) * 0.15 | |
| + (1.0 if schema_ok else 0.0) * 0.25 | |
| + precision * 0.20 | |
| + recall * 0.15 | |
| ) | |
| if has_think_leak: | |
| score *= 0.85 # Penalize leaked think tokens | |
| return { | |
| "valid_json": valid_json, | |
| "think_leak": has_think_leak, | |
| "count_ok": count_ok, | |
| "schema_ok": schema_ok, | |
| "precision": precision, | |
| "recall": recall, | |
| "score": score, | |
| } | |
| def score_conversation(case: ConversationTestCase, response: str) -> dict: | |
| """Score a single conversation response.""" | |
| response_lower = response.lower() | |
| word_count = len(response.split()) | |
| forbidden_hit = [p for p in case.check_forbidden if p.lower() in response_lower] | |
| forbidden_ok = len(forbidden_hit) == 0 | |
| concise_ok = word_count <= 150 if case.check_concise else True | |
| # Simple heuristic for "has opinion": contains a recommendation word | |
| opinion_words = [ | |
| "recommend", | |
| "prefer", | |
| "better", | |
| "worse", | |
| "go with", | |
| "use ", | |
| "i'd", | |
| "i think", | |
| "honestly", | |
| ] | |
| has_opinion = any(w in response_lower for w in opinion_words) | |
| opinion_ok = has_opinion if case.check_has_opinion else True | |
| score = ( | |
| (1.0 if forbidden_ok else 0.0) * 0.50 | |
| + (1.0 if concise_ok else 0.5) * 0.30 | |
| + (1.0 if opinion_ok else 0.5) * 0.20 | |
| ) | |
| return { | |
| "forbidden_ok": forbidden_ok, | |
| "forbidden_hits": forbidden_hit, | |
| "word_count": word_count, | |
| "concise_ok": concise_ok, | |
| "opinion_ok": opinion_ok, | |
| "score": score, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Model runner | |
| # --------------------------------------------------------------------------- | |
| @dataclass | |
| class ModelConfig: | |
| name: str | |
| api_base: str | None = None | |
| @dataclass | |
| class BenchmarkResult: | |
| model: str | |
| role: str | |
| latencies_ms: list[float] = field(default_factory=list) | |
| tokens_per_sec: list[float] = field(default_factory=list) | |
| output_tokens: list[int] = field(default_factory=list) | |
| case_scores: list[dict] = field(default_factory=list) | |
| case_names: list[str] = field(default_factory=list) | |
| errors: int = 0 | |
| @property | |
| def avg_latency_ms(self) -> float: | |
| return sum(self.latencies_ms) / len(self.latencies_ms) if self.latencies_ms else 0.0 | |
| @property | |
| def p90_latency_ms(self) -> float: | |
| if not self.latencies_ms: | |
| return 0.0 | |
| s = sorted(self.latencies_ms) | |
| idx = max(0, int(len(s) * 0.9) - 1) | |
| return s[idx] | |
| @property | |
| def avg_tps(self) -> float: | |
| return ( | |
| sum(self.tokens_per_sec) / len(self.tokens_per_sec) | |
| if self.tokens_per_sec | |
| else 0.0 | |
| ) | |
| @property | |
| def avg_score(self) -> float: | |
| scores = [c.get("score", 0.0) for c in self.case_scores] | |
| return sum(scores) / len(scores) if scores else 0.0 | |
| async def unload_all_models(api_bases: list[str]) -> None: | |
| """Unload all currently loaded models from each Ollama endpoint. | |
| Uses GET /api/ps to list running models, then POST /api/chat with | |
| keep_alive=0 to evict each one from memory. | |
| """ | |
| import httpx | |
| unique_bases = list(dict.fromkeys(b.rstrip("/") for b in api_bases if b)) | |
| for base in unique_bases: | |
| try: | |
| async with httpx.AsyncClient(timeout=10) as client: | |
| resp = await client.get(f"{base}/api/ps") | |
| resp.raise_for_status() | |
| running = resp.json().get("models", []) | |
| except Exception as e: | |
| print(f" Could not list models at {base}: {e}", file=sys.stderr) | |
| continue | |
| if not running: | |
| print(f" {base}: no models loaded") | |
| continue | |
| print(f" {base}: unloading {len(running)} model(s) ...", end="", flush=True) | |
| async with httpx.AsyncClient(timeout=30) as client: | |
| for m in running: | |
| name = m.get("name") or m.get("model", "") | |
| if not name: | |
| continue | |
| try: | |
| await client.post( | |
| f"{base}/api/chat", | |
| json={"model": name, "messages": [], "keep_alive": 0}, | |
| ) | |
| except Exception: | |
| pass | |
| print(" done") | |
| async def call_model( | |
| model_cfg: ModelConfig, | |
| system: str, | |
| user: str, | |
| max_tokens: int = 1024, | |
| think: bool = True, | |
| num_ctx: int | None = None, | |
| history: list[dict] | None = None, | |
| ) -> tuple[str, float, float, int]: | |
| """Call model, return (content, latency_ms, tokens_per_sec, output_tokens).""" | |
| import litellm | |
| messages = [{"role": "system", "content": system}] | |
| if history: | |
| messages.extend(history) | |
| messages.append({"role": "user", "content": user}) | |
| kwargs: dict = { | |
| "model": model_cfg.name, | |
| "messages": messages, | |
| "max_tokens": max_tokens, | |
| "timeout": 1200, | |
| "temperature": 0.1, | |
| } | |
| if model_cfg.api_base: | |
| kwargs["api_base"] = model_cfg.api_base | |
| extra: dict = {} | |
| if not think: | |
| extra["think"] = False | |
| if num_ctx is not None: | |
| extra["num_ctx"] = num_ctx | |
| if extra: | |
| kwargs["extra_body"] = extra | |
| t0 = time.perf_counter() | |
| # print(f"Calling model with {kwargs}") | |
| response = await litellm.acompletion(**kwargs) | |
| elapsed_ms = (time.perf_counter() - t0) * 1000 | |
| content = response.choices[0].message.content or "" | |
| out_tokens = response.usage.completion_tokens or 0 | |
| tps = (out_tokens / (elapsed_ms / 1000)) if elapsed_ms > 0 else 0.0 | |
| return content, elapsed_ms, tps, out_tokens | |
| async def run_extraction_benchmark( | |
| model_cfg: ModelConfig, verbose: bool, no_think: bool = False | |
| ) -> BenchmarkResult: | |
| result = BenchmarkResult(model=model_cfg.name, role="fast") | |
| system = MEMORY_EXTRACTION_SYSTEM | |
| no_think_prefix = "/no_think\n" if no_think else "" | |
| i = 0 | |
| for case in EXTRACTION_CASES: | |
| prompt = no_think_prefix + MEMORY_EXTRACTION_PROMPT_TEMPLATE.format( | |
| conversation=case.conversation | |
| ) | |
| try: | |
| i += 1 | |
| print(f"\n--- extraction turn {i}") | |
| content, latency_ms, tps, out_tokens = await call_model( | |
| model_cfg, | |
| system, | |
| prompt, | |
| max_tokens=512, | |
| think=not no_think, | |
| num_ctx=2048, | |
| ) | |
| except Exception as e: | |
| print(f" ERROR [{case.name}]: {e}", file=sys.stderr) | |
| result.errors += 1 | |
| continue | |
| scores = score_extraction(case, content) | |
| result.latencies_ms.append(latency_ms) | |
| result.tokens_per_sec.append(tps) | |
| result.output_tokens.append(out_tokens) | |
| result.case_scores.append(scores) | |
| result.case_names.append(case.name) | |
| if verbose: | |
| _print_extraction_verbose(case, content, scores, latency_ms, tps) | |
| return result | |
| async def run_conversation_benchmark( | |
| model_cfg: ModelConfig, verbose: bool, no_think: bool = False | |
| ) -> BenchmarkResult: | |
| result = BenchmarkResult(model=model_cfg.name, role="main") | |
| system = SYSTEM_PROMPT | |
| no_think_prefix = "/no_think\n" if no_think else "" | |
| i = 0 | |
| for case in CONVERSATION_CASES: | |
| try: | |
| i += 1 | |
| print(f"\n--- conversation turn {i}") | |
| content, latency_ms, tps, out_tokens = await call_model( | |
| model_cfg, | |
| system, | |
| no_think_prefix + case.user_message, | |
| max_tokens=512, | |
| think=not no_think, | |
| ) | |
| except Exception as e: | |
| print(f" ERROR [{case.name}]: {e}", file=sys.stderr) | |
| result.errors += 1 | |
| continue | |
| scores = score_conversation(case, content) | |
| result.latencies_ms.append(latency_ms) | |
| result.tokens_per_sec.append(tps) | |
| result.output_tokens.append(out_tokens) | |
| result.case_scores.append(scores) | |
| result.case_names.append(case.name) | |
| if verbose: | |
| _print_conversation_verbose(case, content, scores, latency_ms, tps) | |
| return result | |
| async def run_long_conversation_benchmark( | |
| model_cfg: ModelConfig, verbose: bool, no_think: bool = False | |
| ) -> BenchmarkResult: | |
| result = BenchmarkResult(model=model_cfg.name, role="long") | |
| system = SYSTEM_PROMPT | |
| no_think_prefix = "/no_think\n" if no_think else "" | |
| history: list[dict] = [] | |
| for i, turn in enumerate(LONG_CONVERSATION_TURNS): | |
| user_msg = no_think_prefix + turn.user | |
| try: | |
| print( | |
| f"\n--- long conv turn {i + 1:02d}/{len(LONG_CONVERSATION_TURNS)}", | |
| end="", | |
| flush=True, | |
| ) | |
| content, latency_ms, tps, out_tokens = await call_model( | |
| model_cfg, | |
| system, | |
| user_msg, | |
| max_tokens=150, | |
| think=not no_think, | |
| num_ctx=16384, | |
| history=history, | |
| ) | |
| except Exception as e: | |
| print(f" ERROR [turn {i + 1}]: {e}", file=sys.stderr) | |
| result.errors += 1 | |
| history.append({"role": "user", "content": turn.user}) | |
| history.append({"role": "assistant", "content": "..."}) | |
| result.case_scores.append( | |
| { | |
| "personality_ok": False, | |
| "recall_ok": None, | |
| "score": 0.0, | |
| "word_count": 0, | |
| "forbidden_hits": [], | |
| } | |
| ) | |
| result.case_names.append(f"turn_{i + 1:02d}") | |
| result.latencies_ms.append(0.0) | |
| result.tokens_per_sec.append(0.0) | |
| result.output_tokens.append(0) | |
| continue | |
| history.append({"role": "user", "content": turn.user}) | |
| history.append({"role": "assistant", "content": content}) | |
| scores = score_long_turn(turn, content) | |
| result.latencies_ms.append(latency_ms) | |
| result.tokens_per_sec.append(tps) | |
| result.output_tokens.append(out_tokens) | |
| result.case_scores.append(scores) | |
| result.case_names.append(f"turn_{i + 1:02d}") | |
| if verbose: | |
| _print_long_turn_verbose(turn, content, scores, latency_ms, tps, i + 1) | |
| else: | |
| recall_marker = "" | |
| if scores["recall_ok"] is True: | |
| recall_marker = " ✓recall" | |
| elif scores["recall_ok"] is False: | |
| recall_marker = " ✗recall" | |
| personality_marker = "" if scores["personality_ok"] else " ✗personality" | |
| print(f" {latency_ms:.0f}ms {tps:.0f}tok/s{recall_marker}{personality_marker}") | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Output helpers | |
| # --------------------------------------------------------------------------- | |
| def _print_extraction_verbose(case, content, scores, latency_ms, tps): | |
| think_warn = " ⚠ THINK LEAK" if scores["think_leak"] else "" | |
| json_ok = "✓" if scores["valid_json"] else "✗" | |
| schema = "✓" if scores["schema_ok"] else "✗" | |
| print( | |
| f"\n [{case.name}] score={scores['score']:.2f} json={json_ok} schema={schema}" | |
| f" prec={scores['precision']:.1f} rec={scores['recall']:.1f}" | |
| f" {latency_ms:.0f}ms {tps:.0f}tok/s{think_warn}" | |
| ) | |
| stripped = strip_think_tags(content) | |
| preview = stripped[:200].replace("\n", " ") | |
| print(f' → "{preview}{"…" if len(stripped) > 200 else ""}"') | |
| def _print_conversation_verbose(case, content, scores, latency_ms, tps): | |
| fok = "✓" if scores["forbidden_ok"] else f"✗ {scores['forbidden_hits']}" | |
| concise = "✓" if scores["concise_ok"] else f"✗({scores['word_count']}w)" | |
| print( | |
| f"\n [{case.name}] score={scores['score']:.2f} forbidden={fok}" | |
| f" concise={concise} {latency_ms:.0f}ms {tps:.0f}tok/s" | |
| ) | |
| preview = content[:200].replace("\n", " ") | |
| print(f" → {preview}{'…' if len(content) > 200 else ''}") | |
| def _print_long_turn_verbose(turn, content, scores, latency_ms, tps, turn_num): | |
| recall_marker = "" | |
| if scores["recall_ok"] is True: | |
| recall_marker = " ✓recall" | |
| elif scores["recall_ok"] is False: | |
| recall_marker = ( | |
| f" ✗recall(expected '{turn.recall_check[1] if turn.recall_check else '?'}')" | |
| ) | |
| personality_marker = ( | |
| "" if scores["personality_ok"] else f" ✗{scores['forbidden_hits']}" | |
| ) | |
| print( | |
| f"\n [turn {turn_num:02d}] score={scores['score']:.2f}" | |
| f" {latency_ms:.0f}ms {tps:.0f}tok/s{recall_marker}{personality_marker}" | |
| ) | |
| print(f" > {turn.user[:80]}") | |
| preview = content[:200].replace("\n", " ") | |
| print(f" → {preview}{'…' if len(content) > 200 else ''}") | |
| def _long_conv_stats(r: BenchmarkResult) -> dict: | |
| """Compute long-conversation derived stats for a BenchmarkResult.""" | |
| recall_turns = [ | |
| (i, s) for i, s in enumerate(r.case_scores) if s.get("recall_ok") is not None | |
| ] | |
| recall_pct = ( | |
| sum(1 for _, s in recall_turns if s["recall_ok"]) / len(recall_turns) | |
| if recall_turns | |
| else 1.0 | |
| ) | |
| pers_pct = ( | |
| sum(1 for s in r.case_scores if s.get("personality_ok", True)) / len(r.case_scores) | |
| if r.case_scores | |
| else 1.0 | |
| ) | |
| slope = _linear_slope(r.latencies_ms) | |
| early = [s["score"] for s in r.case_scores[:10]] | |
| late = [s["score"] for s in r.case_scores[-10:]] | |
| quality_drop = ( | |
| (sum(early) / len(early) - sum(late) / len(late)) if early and late else 0.0 | |
| ) | |
| return { | |
| "recall_pct": recall_pct, | |
| "pers_pct": pers_pct, | |
| "slope": slope, | |
| "quality_drop": quality_drop, | |
| "recall_turns": recall_turns, | |
| } | |
| def _col(s, width): | |
| return str(s)[:width].ljust(width) | |
| def print_summary_table(results: list[BenchmarkResult]): | |
| """Print a side-by-side comparison table.""" | |
| sep = "-" * 90 | |
| print(f"\n{'=' * 90}") | |
| print("BENCHMARK RESULTS SUMMARY") | |
| print(f"{'=' * 90}\n") | |
| # Group by role | |
| fast_results = [r for r in results if r.role == "fast"] | |
| main_results = [r for r in results if r.role == "main"] | |
| long_results = [r for r in results if r.role == "long"] | |
| # Compute overall score per model (avg across whichever roles were run) | |
| scores_by_model: dict[str, list[float]] = {} | |
| for r in results: | |
| scores_by_model.setdefault(r.model, []).append(r.avg_score) | |
| overall_by_model = {m: sum(v) / len(v) for m, v in scores_by_model.items()} | |
| def sort_key(r: BenchmarkResult) -> float: | |
| return -overall_by_model.get(r.model, 0.0) | |
| # --- OVERALL section --- | |
| all_models = sorted(overall_by_model, key=lambda m: -overall_by_model[m]) | |
| if len({r.role for r in results}) > 1: | |
| print(" OVERALL SCORE (avg across all roles run)") | |
| print(f" {sep[:80]}") | |
| role_cols = [] | |
| if fast_results: | |
| role_cols.append(("Fast", fast_results)) | |
| if main_results: | |
| role_cols.append(("Main", main_results)) | |
| if long_results: | |
| role_cols.append(("Long", long_results)) | |
| header = ( | |
| f" {'Model':<35}" | |
| + "".join(f"{lbl:>8}" for lbl, _ in role_cols) | |
| + f"{'Overall':>9}" | |
| ) | |
| print(header) | |
| print(f" {sep[:80]}") | |
| for model in all_models: | |
| short = model.replace("ollama/", "").replace("ollama_chat/", "") | |
| row = f" {_col(short, 35)}" | |
| for _, role_res in role_cols: | |
| match = next((r for r in role_res if r.model == model), None) | |
| row += f"{match.avg_score:>8.2f}" if match else f"{'—':>8}" | |
| row += f"{overall_by_model[model]:>9.2f}" | |
| print(row) | |
| print() | |
| # --- Per-role sections --- | |
| role_sections = [ | |
| ("FAST MODEL (memory extraction)", fast_results, False), | |
| ("MAIN MODEL (conversation)", main_results, False), | |
| ("LONG CONVERSATION (60-turn session)", long_results, True), | |
| ] | |
| for role_label, role_results, is_long in role_sections: | |
| if not role_results: | |
| continue | |
| ordered = sorted(role_results, key=sort_key) | |
| print(f" {role_label}") | |
| print(f" {sep[:80]}") | |
| if is_long: | |
| header = f" {'Model':<35} {'Score':>6} {'Recall':>7} {'Pers%':>6} {'Tok/s':>6} {'Lat↑ms/t':>9} {'Qlty↓':>6}" | |
| print(header) | |
| print(f" {sep[:80]}") | |
| for r in ordered: | |
| short = r.model.replace("ollama/", "").replace("ollama_chat/", "") | |
| st = _long_conv_stats(r) | |
| print( | |
| f" {_col(short, 35)} " | |
| f"{r.avg_score:>6.2f} " | |
| f"{st['recall_pct']:>6.0%} " | |
| f"{st['pers_pct']:>5.0%} " | |
| f"{r.avg_tps:>6.0f} " | |
| f"{st['slope']:>+9.1f} " | |
| f"{st['quality_drop']:>+6.2f}" | |
| ) | |
| for i, s in st["recall_turns"]: | |
| if not s["recall_ok"]: | |
| turn = LONG_CONVERSATION_TURNS[i] | |
| label = turn.recall_check[0] if turn.recall_check else "multi" | |
| print(f" ✗ turn {i + 1:02d} recall failed ({label})") | |
| else: | |
| header = f" {'Model':<35} {'Score':>6} {'Lat(ms)':>8} {'P90(ms)':>8} {'Tok/s':>6} {'Errors':>6}" | |
| print(header) | |
| print(f" {sep[:80]}") | |
| for r in ordered: | |
| short = r.model.replace("ollama/", "").replace("ollama_chat/", "") | |
| print( | |
| f" {_col(short, 35)} " | |
| f"{r.avg_score:>6.2f} " | |
| f"{r.avg_latency_ms:>8.0f} " | |
| f"{r.p90_latency_ms:>8.0f} " | |
| f"{r.avg_tps:>6.0f} " | |
| f"{r.errors:>6}" | |
| ) | |
| print() | |
| # Per-case breakdown | |
| all_cases = ordered[0].case_names if ordered else [] | |
| if all_cases: | |
| print(" Per-case scores:") | |
| case_header = f" {'Case':<30}" + "".join( | |
| f"{r.model.replace('ollama/', '')[:10]:>12}" for r in ordered | |
| ) | |
| print(case_header) | |
| print(f" {sep[:80]}") | |
| for i, case_name in enumerate(all_cases): | |
| row = f" {_col(case_name, 30)}" | |
| for r in ordered: | |
| row += ( | |
| f"{r.case_scores[i].get('score', 0.0):>12.2f}" | |
| if i < len(r.case_scores) | |
| else f"{'N/A':>12}" | |
| ) | |
| print(row) | |
| print() | |
| print(f"{'=' * 90}") | |
| print("Score: 0.0–1.0 composite. Higher = better.") | |
| if fast_results: | |
| print( | |
| "Fast: JSON valid (25%) + count (15%) + schema (25%) + precision (20%) + recall (15%)" | |
| ) | |
| if main_results: | |
| print("Main: no forbidden phrases (50%) + concise (30%) + has opinion (20%)") | |
| if long_results: | |
| print("Long: personality per-turn (40%) + recall accuracy (60% on recall turns)") | |
| print(" Lat↑ms/t = latency slope ms/turn | Qlty↓ = score drop turns 1-10 → 51-60") | |
| # --------------------------------------------------------------------------- | |
| # CLI | |
| # --------------------------------------------------------------------------- | |
| def parse_model_arg(arg: str, default_api_base: str | None) -> ModelConfig: | |
| """Parse 'model_name@http://endpoint' or just 'model_name'.""" | |
| if "@" in arg: | |
| name, api_base = arg.rsplit("@", 1) | |
| return ModelConfig(name=name.strip(), api_base=api_base.strip()) | |
| return ModelConfig(name=arg.strip(), api_base=default_api_base) | |
| async def main(): | |
| parser = argparse.ArgumentParser(description="Benchmark LLMs for SR2 agent roles") | |
| parser.add_argument( | |
| "--model", | |
| "-m", | |
| action="append", | |
| dest="models", | |
| required=True, | |
| help="Model to benchmark. Format: 'ollama/name' or 'ollama/name@http://endpoint'. " | |
| "Can be specified multiple times.", | |
| ) | |
| parser.add_argument( | |
| "--api-base", | |
| default=None, | |
| help="Default Ollama API base URL (used when model has no @ endpoint).", | |
| ) | |
| parser.add_argument( | |
| "--role", | |
| choices=["fast", "main", "long", "all"], | |
| default="all", | |
| help="Which role to benchmark: fast (memory extraction), main (conversation), long (60-turn session), all.", | |
| ) | |
| parser.add_argument( | |
| "--verbose", | |
| "-v", | |
| action="store_true", | |
| help="Print each test case response and scores.", | |
| ) | |
| parser.add_argument( | |
| "--no-think", | |
| action="store_true", | |
| help="Append /no_think to system prompts to disable chain-of-thought (qwen3 series). " | |
| "Recommended for fast-model benchmarks to prevent thinking tokens eating the budget.", | |
| ) | |
| args = parser.parse_args() | |
| model_configs = [parse_model_arg(m, args.api_base) for m in args.models] | |
| roles = ["fast", "main", "long"] if args.role == "all" else [args.role] | |
| all_results: list[BenchmarkResult] = [] | |
| for model_cfg in model_configs: | |
| short = model_cfg.name.replace("ollama/", "") | |
| print( | |
| f"Starting test for model {short}; warming up so the tests don't run on a cold start" | |
| ) | |
| warmup_content, _, warmup_tps, _ = await call_model( | |
| model_cfg, "Just say hi back", "Hi", max_tokens=256 | |
| ) | |
| print(f"Warmup complete with return '{warmup_content}' at {warmup_tps:.1f}tps") | |
| for role in roles: | |
| label = { | |
| "fast": "memory extraction", | |
| "main": "conversation", | |
| "long": "60-turn session", | |
| }[role] | |
| print(f"\nBenchmarking {short} [{label}] ...") | |
| if role == "fast": | |
| result = await run_extraction_benchmark(model_cfg, args.verbose, args.no_think) | |
| elif role == "main": | |
| result = await run_conversation_benchmark( | |
| model_cfg, args.verbose, args.no_think | |
| ) | |
| else: | |
| result = await run_long_conversation_benchmark( | |
| model_cfg, args.verbose, args.no_think | |
| ) | |
| n = len(result.case_scores) | |
| print( | |
| f" Done: {n} cases, avg_score={result.avg_score:.2f}, " | |
| f"avg_lat={result.avg_latency_ms:.0f}ms, " | |
| f"avg_tps={result.avg_tps:.0f} tok/s, " | |
| f"errors={result.errors}" | |
| ) | |
| all_results.append(result) | |
| print_summary_table(all_results) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment