-
-
Save nmamano/00a26543293519f6ef394fe447321033 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Run with: | |
| uv tool run --from openai python scripts/test_deepseek_local.py FOLDER_NAME | |
| Assumes that llama server is running on http://127.0.0.1:8080 | |
| Run with: | |
| $ cd /c/llm/tools/llama-bins | |
| $ ./llama-server.exe \ | |
| -m /c/llm/models/DeepSeek-Coder-V2-Lite-Instruct_Q6_K.gguf \ | |
| --port 8080 \ | |
| --ctx-size 8192 \ | |
| -ngl 999 | |
| It should show something like this at the end of the output: | |
| main: server is listening on http://127.0.0.1:8080 | |
| main: starting the main loop... | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import textwrap | |
| from pathlib import Path | |
| from openai import OpenAI | |
| RETRY_COUNT = 3 | |
| parser = argparse.ArgumentParser( | |
| description="Translate a Python solution into Go using an LLM." | |
| ) | |
| parser.add_argument("problem_dir", help="Path to the problem directory") | |
| parser.add_argument( | |
| "--llm-provider", | |
| choices=["local", "cursor-agent", "openai"], | |
| default="openai", | |
| help="Which LLM backend to use (default: openai)", | |
| ) | |
| parser.add_argument( | |
| "--openai-model", | |
| default="gpt-5.1", | |
| help="Override OPENAI_MODEL for the openai provider (default gpt-5.1)", | |
| ) | |
| parser.add_argument( | |
| "--context-size", | |
| type=int, | |
| default=8192, | |
| help="Approximate model context window (for warning only). Default: 8192.", | |
| ) | |
| parser.add_argument( | |
| "--see-prompt", | |
| action="store_true", | |
| help="Print the composed prompt and exit without calling any LLMs.", | |
| ) | |
| args = parser.parse_args() | |
| PROBLEM_DIR = Path(args.problem_dir) | |
| LLM_PROVIDER = args.llm_provider | |
| OPENAI_MODEL = args.openai_model | |
| CONTEXT_SIZE = args.context_size | |
| PYTHON_SOLUTION = PROBLEM_DIR / "solution.py" | |
| GO_SOLUTION = PROBLEM_DIR / "solution.go" | |
| BAD_GO_SOLUTION = PROBLEM_DIR / "bad_solution.go" | |
| # e.g. problems/binary-search/cctv-footage -> "cctv-footage" | |
| PROBLEM_SLUG = PROBLEM_DIR.name | |
| RESULT_PREFIX = "__RESULT__ " | |
| LOCAL_MODEL = "DeepSeek-Coder-V2-Lite-Instruct_Q6_K.gguf" | |
| client: OpenAI | None = None | |
| NEW_SYSTEM_PROMPT = """ | |
| You are doing STRICT mechanical code translation from Python to Go for an | |
| educational codebase. The Go code must mirror the Python structure as closely | |
| as possible, including comments. | |
| 1. TOP-LEVEL ENTITIES: | |
| - The set of top-level functions, classes, and types in Go MUST MATCH the | |
| Python code EXACTLY EXCEPT for being camelCase. | |
| - Do NOT introduce any new top-level helpers that are not in the Python | |
| solution. | |
| - If you need MAX or MIN over a range, use slices.Max and slices.Min. If you | |
| need MAX or MIN over 2 values, use the built-in max and min. If you need | |
| to SUM over a range, just do a direct loop. Do NOT declare min, max, swap, or | |
| sum as top-level entities. You can also use strings.ContainsRune and math.Abs. | |
| - Do NOT remove, rename, or split existing top-level entities. | |
| - If you need a queue or deque, import and use the container/list standard library package. If you need a heap, import container/heap. | |
| - If you need to import a Union-Find class, import it from package "book-code/data_structures/go" and rename the package to "ds". It has methods ds.NewUnionFind() which gives a type with Add(x int), Find(x int), Union(x, y int). The type also has public Parent and Size maps (map[int]int). Only Find returns a value which is an int. | |
| - If you need to create a new class and you need a constructor, create a NewClassName function. | |
| 2. NESTED FUNCTIONS: | |
| - If a helper function is NESTED inside another function in Python, it MUST | |
| ALSO be nested inside the corresponding function in Go using | |
| name := func(...). Do NOT lift nested helpers to the top level, even if | |
| that would be more idiomatic Go. Go supports functions inside functions: | |
| you MUST use that for nested Python helpers. | |
| - If a nested function is called recursively, declare it first and then assign to it. Avoid closure factories; simply define the nested function when everything needed is already in scope. | |
| 3. BEHAVIOR: | |
| - The behavior must stay exactly the same and pass all the same tests. | |
| 4. STYLE: | |
| - Use normal Go syntax and naming conversions, but NEVER at the expense of | |
| the structural rules above. gofmt-compatible formatting is desired, but | |
| structure is more important than idiomaticity. | |
| - You are NOT allowed to reinterpret or normalize sentinel values, bounds, | |
| or initial conditions. Numeric constants that define search bounds (such | |
| as l = 0, r = max(...), etc.) must be preserved exactly unless the Python | |
| code would be invalid in Go. | |
| - For acronyms in top-level entity names, ONLY capitalize the first letter. | |
| For example, write tunnelDepthDfs, NOT tunnelDepthDFS. | |
| 5. TESTS: | |
| - Do NOT strip comments from the Python solution, even from the tests. | |
| - Translate Python's asserts as panic(fmt.Sprintf(...)). | |
| Output ONLY the Go source code, no other text. Do NOT wrap the code in a | |
| markdown code block. | |
| Here are two examples of correct translations from Python to Go. | |
| They are provided only as references. Do NOT translate or modify | |
| them. After the examples, you will find the actual Python solution | |
| that you must translate. | |
| === Example 1: Nested helpers and slices.Max === | |
| === Python (input) === | |
| import math | |
| def min_pages_per_day(page_counts, days): | |
| # How many days it takes to finish the book with a given daily page limit. | |
| def days_to_finish(daily_limit): | |
| d = 0 | |
| for pages in page_counts: | |
| # Ceiling division to handle leftover pages. | |
| d += math.ceil(pages / daily_limit) | |
| return d | |
| # Defines a transition point over the range of # of pages per day (daily_limit). | |
| # In the 'before' region, the daily limit is not enough to finish the book in time. | |
| # In the 'after' region, the daily limit is enough to finish the book in time. | |
| def is_before(daily_limit): | |
| return days_to_finish(daily_limit) > days | |
| l = 0 | |
| # In case we have more days than max pages in any chapter, | |
| # we might need to read as little as 1 page per day. | |
| r = max(page_counts) | |
| # Binary search for the transition point from 'before' to 'after' region. | |
| while r - l > 1: | |
| mid = (l + r) // 2 | |
| if is_before(mid): | |
| l = mid | |
| else: | |
| r = mid | |
| # Return the first value in the 'after' region, i.e., the smallest daily limit | |
| # that allows us to finish the book in time. | |
| return r | |
| def run_tests(): | |
| tests = [ | |
| # Example from book | |
| ([20, 15, 17, 10], 5, 17), | |
| ([20, 15, 17, 10], 14, 5), | |
| ([20, 15, 17, 10], 17, 4), | |
| # Edge case - single chapter | |
| ([10], 5, 2), | |
| # Edge case - days = chapters | |
| ([1, 2, 3], 3, 3), | |
| # Edge case - more days than max chapter pages | |
| ([20], 21, 1) | |
| ] | |
| for page_counts, days, want in tests: | |
| got = min_pages_per_day(page_counts, days) | |
| assert got == want, f"\\nmin_pages_per_day({page_counts}, {days}): got: { | |
| got}, want: {want}\\n" | |
| if __name__ == "__main__": | |
| run_tests() | |
| === Go (output) === | |
| package main | |
| import ( | |
| "fmt" | |
| "math" | |
| "slices" | |
| ) | |
| func minPagesPerDay(pageCounts []int, days int) int { | |
| // How many days it takes to finish the book with a given daily page limit. | |
| daysToFinish := func(dailyLimit int) int { | |
| d := 0 | |
| for _, pages := range pageCounts { | |
| // Ceiling division to handle leftover pages. | |
| d += int(math.Ceil(float64(pages) / float64(dailyLimit))) | |
| } | |
| return d | |
| } | |
| // Defines a transition point over the range of # of pages per day (dailyLimit). | |
| // In the 'before' region, the daily limit is not enough to finish the book in time. | |
| // In the 'after' region, the daily limit is enough to finish the book in time. | |
| isBefore := func(dailyLimit int) bool { | |
| return daysToFinish(dailyLimit) > days | |
| } | |
| l := 0 | |
| // In case we have more days than max pages in any chapter, | |
| // we might need to read as little as 1 page per day. | |
| r := slices.Max(pageCounts) | |
| // Binary search for the transition point from 'before' to 'after' region. | |
| for r-l > 1 { | |
| mid := (l + r) / 2 | |
| if isBefore(mid) { | |
| l = mid | |
| } else { | |
| r = mid | |
| } | |
| } | |
| // Return the first value in the 'after' region, i.e., the smallest daily limit | |
| // that allows us to finish the book in time. | |
| return r | |
| } | |
| func runTests() { | |
| tests := []struct { | |
| pageCounts []int | |
| days int | |
| want int | |
| }{ | |
| // Example from book | |
| {[]int{20, 15, 17, 10}, 5, 17}, | |
| {[]int{20, 15, 17, 10}, 14, 5}, | |
| {[]int{20, 15, 17, 10}, 17, 4}, | |
| // Edge case - single chapter | |
| {[]int{10}, 5, 2}, | |
| // Edge case - days = chapters | |
| {[]int{1, 2, 3}, 3, 3}, | |
| // Edge case - more days than max chapter pages | |
| {[]int{20}, 21, 1}, | |
| } | |
| for _, test := range tests { | |
| got := minPagesPerDay(test.pageCounts, test.days) | |
| if got != test.want { | |
| panic(fmt.Sprintf("\\nminPagesPerDay(%v, %d): got: %d, want: %d\\n", test.pageCounts, test.days, got, test.want)) | |
| } | |
| } | |
| } | |
| func main() { | |
| runTests() | |
| } | |
| ------------------------------------------------------------------------ | |
| === Example 2: Class-based solution === | |
| === Python (input) === | |
| class TideAerialView: | |
| # Time: O(log n) | |
| # Space: O(1) | |
| def get_ones_in_row(self, row): | |
| if row[0] == 0: | |
| return 0 | |
| if row[-1] == 1: | |
| return len(row) | |
| def is_before(idx): | |
| return row[idx] == 1 | |
| l, r = 0, len(row) | |
| while r - l > 1: | |
| mid = (l + r) // 2 | |
| if is_before(mid): | |
| l = mid | |
| else: | |
| r = mid | |
| return r | |
| # Time: O(n log n) | |
| # Space: O(1) | |
| def get_ones_in_picture(self, picture): | |
| ones = 0 | |
| for row in picture: | |
| ones += self.get_ones_in_row(row) | |
| return ones | |
| # Time: O((log k) * n log n) | |
| # Space: O(1) | |
| def solve(self, pictures): | |
| def is_before(picture): | |
| water = self.get_ones_in_picture(picture) | |
| total = len(picture[0])**2 | |
| return water / total < 0.5 | |
| if not is_before(pictures[0]): | |
| return 0 | |
| if is_before(pictures[-1]): | |
| return len(pictures) - 1 | |
| l, r = 0, len(pictures) - 1 | |
| while r - l > 1: | |
| mid = (l + r) // 2 | |
| if is_before(pictures[mid]): | |
| l = mid | |
| else: | |
| r = mid | |
| # Return the closest one to the midpoint, or l in case of a tie | |
| l_water = self.get_ones_in_picture(pictures[l]) | |
| r_water = self.get_ones_in_picture(pictures[r]) | |
| mid_point = len(pictures[0])**2 / 2 | |
| return l if abs(l_water - mid_point) <= abs(r_water - mid_point) else r | |
| def run_tests(): | |
| tests = [ | |
| # Example from the book | |
| ([[[0, 0, 0], | |
| [0, 0, 0], | |
| [0, 0, 0]], | |
| [[1, 0, 0], | |
| [0, 0, 0], | |
| [1, 0, 0]], | |
| [[1, 1, 0], | |
| [0, 0, 0], | |
| [1, 0, 0]], | |
| [[1, 1, 0], | |
| [1, 1, 1], | |
| [1, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 1], | |
| [1, 1, 0]]], 2), | |
| # 3 pictures with increasing water | |
| ([[[1, 0, 0], | |
| [1, 0, 0], | |
| [1, 0, 0]], | |
| [[1, 1, 0], | |
| [1, 1, 0], | |
| [1, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 1], | |
| [1, 0, 0]]], 1), | |
| # 2 pictures | |
| ([[[1, 0], | |
| [0, 0]], | |
| [[1, 1], | |
| [1, 0]]], 0), | |
| # Incremental progression | |
| ([[[0, 0, 0], | |
| [0, 0, 0], | |
| [0, 0, 0]], | |
| [[1, 0, 0], | |
| [0, 0, 0], | |
| [0, 0, 0]], | |
| [[1, 0, 0], | |
| [1, 0, 0], | |
| [0, 0, 0]], | |
| [[1, 1, 0], | |
| [1, 0, 0], | |
| [0, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 0, 0], | |
| [0, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 0], | |
| [0, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 1], | |
| [0, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 1], | |
| [1, 0, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 1], | |
| [1, 1, 0]], | |
| [[1, 1, 1], | |
| [1, 1, 1], | |
| [1, 1, 1]], | |
| ], 4), | |
| # Edge case - single picture | |
| ([[[1, 1], [0, 0]]], 0), | |
| # Edge case - all water | |
| ([[[1, 1], [1, 1]]], 0), | |
| # Edge case - all land | |
| ([[[0, 0], [0, 0]]], 0) | |
| ] | |
| for pictures, want in tests: | |
| got = TideAerialView().solve(pictures) | |
| assert got == want, f"\\ntide_aerial_view({pictures}): got: { | |
| got}, want: {want}\\n" | |
| if __name__ == "__main__": | |
| run_tests() | |
| === Go (output) === | |
| package main | |
| import ( | |
| "fmt" | |
| "math" | |
| ) | |
| type TideAerialView struct{} | |
| // Time: O(log n) | |
| // Space: O(1) | |
| func (t *TideAerialView) getOnesInRow(row []int) int { | |
| if row[0] == 0 { | |
| return 0 | |
| } | |
| if row[len(row)-1] == 1 { | |
| return len(row) | |
| } | |
| isBefore := func(idx int) bool { | |
| return row[idx] == 1 | |
| } | |
| l, r := 0, len(row) | |
| for r-l > 1 { | |
| mid := (l + r) / 2 | |
| if isBefore(mid) { | |
| l = mid | |
| } else { | |
| r = mid | |
| } | |
| } | |
| return r | |
| } | |
| // Time: O(n log n) | |
| // Space: O(1) | |
| func (t *TideAerialView) getOnesInPicture(picture [][]int) int { | |
| ones := 0 | |
| for _, row := range picture { | |
| ones += t.getOnesInRow(row) | |
| } | |
| return ones | |
| } | |
| // Time: O((log k) * n log n) | |
| // Space: O(1) | |
| func (t *TideAerialView) solve(pictures [][][]int) int { | |
| isBefore := func(picture [][]int) bool { | |
| water := t.getOnesInPicture(picture) | |
| total := len(picture[0]) * len(picture[0]) | |
| return float64(water)/float64(total) < 0.5 | |
| } | |
| if !isBefore(pictures[0]) { | |
| return 0 | |
| } | |
| if isBefore(pictures[len(pictures)-1]) { | |
| return len(pictures) - 1 | |
| } | |
| l, r := 0, len(pictures)-1 | |
| for r-l > 1 { | |
| mid := (l + r) / 2 | |
| if isBefore(pictures[mid]) { | |
| l = mid | |
| } else { | |
| r = mid | |
| } | |
| } | |
| // Return the closest one to the midpoint, or l in case of a tie | |
| lWater := t.getOnesInPicture(pictures[l]) | |
| rWater := t.getOnesInPicture(pictures[r]) | |
| midPoint := float64(len(pictures[0])*len(pictures[0])) / 2 | |
| if math.Abs(float64(lWater)-midPoint) <= math.Abs(float64(rWater)-midPoint) { | |
| return l | |
| } | |
| return r | |
| } | |
| func runTests() { | |
| tests := []struct { | |
| pictures [][][]int | |
| want int | |
| }{ | |
| // Example from the book | |
| { | |
| [][][]int{ | |
| {{0, 0, 0}, {0, 0, 0}, {0, 0, 0}}, | |
| {{1, 0, 0}, {0, 0, 0}, {1, 0, 0}}, | |
| {{1, 1, 0}, {0, 0, 0}, {1, 0, 0}}, | |
| {{1, 1, 0}, {1, 1, 1}, {1, 0, 0}}, | |
| {{1, 1, 1}, {1, 1, 1}, {1, 1, 0}}, | |
| }, | |
| 2, | |
| }, | |
| // 3 pictures with increasing water | |
| { | |
| [][][]int{ | |
| {{1, 0, 0}, {1, 0, 0}, {1, 0, 0}}, | |
| {{1, 1, 0}, {1, 1, 0}, {1, 0, 0}}, | |
| {{1, 1, 1}, {1, 1, 1}, {1, 0, 0}}, | |
| }, | |
| 1, | |
| }, | |
| // 2 pictures | |
| { | |
| [][][]int{ | |
| {{1, 0}, {0, 0}}, | |
| {{1, 1}, {1, 0}}, | |
| }, | |
| 0, | |
| }, | |
| // Incremental progression | |
| { | |
| [][][]int{ | |
| {{0, 0, 0}, {0, 0, 0}, {0, 0, 0}}, | |
| {{1, 0, 0}, {0, 0, 0}, {0, 0, 0}}, | |
| {{1, 0, 0}, {1, 0, 0}, {0, 0, 0}}, | |
| {{1, 1, 0}, {1, 0, 0}, {0, 0, 0}}, | |
| {{1, 1, 1}, {1, 0, 0}, {0, 0, 0}}, | |
| {{1, 1, 1}, {1, 1, 0}, {0, 0, 0}}, | |
| {{1, 1, 1}, {1, 1, 1}, {0, 0, 0}}, | |
| {{1, 1, 1}, {1, 1, 1}, {1, 0, 0}}, | |
| {{1, 1, 1}, {1, 1, 1}, {1, 1, 0}}, | |
| {{1, 1, 1}, {1, 1, 1}, {1, 1, 1}}, | |
| }, | |
| 4, | |
| }, | |
| // Edge case - single picture | |
| { | |
| [][][]int{ | |
| {{1, 1}, {0, 0}}, | |
| }, | |
| 0, | |
| }, | |
| // Edge case - all water | |
| { | |
| [][][]int{ | |
| {{1, 1}, {1, 1}}, | |
| }, | |
| 0, | |
| }, | |
| // Edge case - all land | |
| { | |
| [][][]int{ | |
| {{0, 0}, {0, 0}}, | |
| }, | |
| 0, | |
| }, | |
| } | |
| for _, test := range tests { | |
| got := new(TideAerialView).solve(test.pictures) | |
| if got != test.want { | |
| panic(fmt.Sprintf("\\ntideAerialView(%v): got: %v, want: %v\\n", test.pictures, got, test.want)) | |
| } | |
| } | |
| } | |
| func main() { | |
| runTests() | |
| } | |
| ---------------- END OF EXAMPLES ---------------- | |
| """ | |
| SYSTEM_PROMPT = NEW_SYSTEM_PROMPT.strip() | |
| def ensure_cursor_agent_available() -> None: | |
| if shutil.which("cursor-agent") is None: | |
| print( | |
| "Error: cursor-agent not found in PATH. Install it or choose --llm-provider local.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| def estimate_tokens(text: str) -> int: | |
| # Rough heuristic: 1 token ≈ 4 characters | |
| return max(1, len(text) // 4) | |
| def warn_if_near_context(prompt_tokens: int, context_size: int, label: str) -> None: | |
| if context_size <= 0: | |
| return | |
| # Warn if we are at or above 85% of the available context. | |
| if prompt_tokens >= int(context_size * 0.85): | |
| print( | |
| f"[warn] {label} prompt is large " | |
| f"({prompt_tokens} est tokens vs context {context_size})." | |
| ) | |
| def emit_result( | |
| status: str, failure_stage: str | None = None, error_message: str | None = None | |
| ) -> None: | |
| payload: dict[str, str] = {"status": status} | |
| if failure_stage: | |
| payload["failure_stage"] = failure_stage | |
| if error_message: | |
| payload["error_message"] = error_message | |
| print(f"{RESULT_PREFIX}{json.dumps(payload, ensure_ascii=False)}") | |
| def build_user_prompt( | |
| python_source: str, | |
| prev_go: str | None, | |
| error_msg: str | None, | |
| ) -> str: | |
| tests_were_passing = error_msg is not None and "SUCCESS: all tests passed" in error_msg | |
| if prev_go is None and error_msg is None: | |
| return f"""\ | |
| Translate this Python solution to Go: | |
| {python_source} | |
| """.strip() | |
| extra_hint = "" | |
| if tests_were_passing: | |
| extra_hint = textwrap.dedent( | |
| """\ | |
| Important: The previous Go version ALREADY PASSED ALL TESTS. You MUST keep | |
| its behavior exactly the same and only make the minimal structural changes | |
| required to satisfy the parser error. Do NOT change any logic that affects | |
| outputs or test results. | |
| """ | |
| ) | |
| return textwrap.dedent( | |
| f"""\ | |
| Here is the original Python solution: | |
| {python_source} | |
| Here is the previous Go translation attempt (which failed to compile, pass | |
| the tests, or pass the parser checks): | |
| {prev_go} | |
| Here is the full output from `go run` and/or `parser.py`: | |
| {error_msg} | |
| {extra_hint}Produce a corrected Go version that compiles, passes the tests, | |
| and satisfies the parser checks, following the same rules as before. Output | |
| ONLY the source code, no other text. Do NOT wrap the code in a markdown | |
| codeblock. | |
| """ | |
| ).strip() | |
| def normalize_model_output(raw: str) -> str: | |
| cleaned = raw.strip() | |
| if cleaned.startswith("```"): | |
| lines = cleaned.splitlines() | |
| lines = lines[1:] | |
| if lines and lines[-1].strip().startswith("```"): | |
| lines = lines[:-1] | |
| cleaned = "\n".join(lines).strip() | |
| return cleaned + "\n" | |
| def call_model_local(user_content: str, temperature: float) -> str: | |
| if client is None: | |
| raise RuntimeError("Local LLM provider requested but OpenAI client is not configured") | |
| resp = client.chat.completions.create( | |
| model=LOCAL_MODEL, | |
| temperature=temperature, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_content}, | |
| ], | |
| ) | |
| raw = resp.choices[0].message.content.strip() | |
| return normalize_model_output(raw) | |
| def call_model_openai(user_content: str, temperature: float) -> str: | |
| if client is None: | |
| raise RuntimeError("OpenAI provider requested but OpenAI client is not configured") | |
| resp = client.chat.completions.create( | |
| model=OPENAI_MODEL, | |
| temperature=temperature, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_content}, | |
| ], | |
| ) | |
| raw = resp.choices[0].message.content.strip() | |
| return normalize_model_output(raw) | |
| def call_model_cursor_agent(prompt: str) -> str: | |
| proc = subprocess.run( | |
| ["cursor-agent", "-p", prompt, "--output-format", "text"], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| if proc.returncode != 0: | |
| details = proc.stderr.strip() or proc.stdout.strip() | |
| raise RuntimeError(f"cursor-agent failed:\n{details}") | |
| if not proc.stdout.strip(): | |
| raise RuntimeError("cursor-agent returned empty output") | |
| return normalize_model_output(proc.stdout) | |
| def call_model( | |
| python_source: str, | |
| prev_go: str | None, | |
| error_msg: str | None, | |
| temperature: float = 0.0, | |
| ) -> str: | |
| user_content = build_user_prompt(python_source, prev_go, error_msg) | |
| full_prompt = f"{SYSTEM_PROMPT}\n\n{user_content}" | |
| prompt_tokens = estimate_tokens(full_prompt) | |
| warn_if_near_context(prompt_tokens, CONTEXT_SIZE, "LLM") | |
| if LLM_PROVIDER == "cursor-agent": | |
| return call_model_cursor_agent(full_prompt) | |
| if LLM_PROVIDER == "openai": | |
| return call_model_openai(user_content, temperature) | |
| return call_model_local(user_content, temperature) | |
| def try_compile_and_run(go_file: Path) -> tuple[bool, str]: | |
| """Run `go run` and return (success, combined_output).""" | |
| proc = subprocess.run( | |
| ["go", "run", str(go_file)], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| success = proc.returncode == 0 | |
| combined = (proc.stdout + "\n" + proc.stderr).strip() | |
| return success, combined | |
| def run_gofmt() -> None: | |
| subprocess.run( | |
| ["gofmt", "-w", str(GO_SOLUTION)], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| check=True, | |
| ) | |
| def try_parser_check() -> tuple[bool, str]: | |
| """ | |
| Run parser.py --lang go --problem <slug> with the current Python | |
| and return (success, combined_output). | |
| Assumes we are being run from the repo root (same as parser.py). | |
| """ | |
| proc = subprocess.run( | |
| [sys.executable, "parser.py", "--lang", "go", "--problem", PROBLEM_SLUG], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| ) | |
| success = proc.returncode == 0 | |
| combined = (proc.stdout + "\n" + proc.stderr).strip() | |
| return success, combined | |
| def trim_parser_output(output: str, skip: int = 6) -> str: | |
| """Drop the first `skip` lines from parser.py output when printing.""" | |
| lines = output.splitlines() | |
| if len(lines) <= skip: | |
| return "" | |
| return "\n".join(lines[skip:]) | |
| def init_client_if_needed() -> None: | |
| """ | |
| Initialize the OpenAI client or check cursor-agent, unless we are in | |
| --see-prompt mode (in which case this should never be called). | |
| """ | |
| global client | |
| if args.see_prompt: | |
| return | |
| if LLM_PROVIDER == "local": | |
| client = OpenAI( | |
| base_url="http://127.0.0.1:8080/v1", | |
| api_key="sk-local", # dummy | |
| ) | |
| elif LLM_PROVIDER == "openai": | |
| api_key = os.environ.get("OPENAI_API_KEY") | |
| if not api_key: | |
| print( | |
| "Error: OPENAI_API_KEY not set. Export it or choose another provider.", | |
| file=sys.stderr, | |
| ) | |
| sys.exit(1) | |
| base_url = os.environ.get("OPENAI_BASE_URL") | |
| client = OpenAI(api_key=api_key, base_url=base_url or None) | |
| elif LLM_PROVIDER == "cursor-agent": | |
| ensure_cursor_agent_available() | |
| def main() -> None: | |
| if not PYTHON_SOLUTION.exists(): | |
| msg = f"Missing: {PYTHON_SOLUTION}" | |
| print(msg) | |
| emit_result("fail", failure_stage="missing_python_solution", error_message=msg) | |
| sys.exit(1) | |
| # Never keep both solution.go and bad_solution.go in the same run. | |
| BAD_GO_SOLUTION.unlink(missing_ok=True) | |
| python_source = PYTHON_SOLUTION.read_text(encoding="utf-8") | |
| if args.see_prompt: | |
| user_content = build_user_prompt( | |
| python_source, | |
| prev_go=None, | |
| error_msg=None, | |
| ) | |
| full_prompt = f"{SYSTEM_PROMPT}\n\n{user_content}" | |
| print(full_prompt) | |
| return | |
| init_client_if_needed() | |
| prev_go: str | None = None | |
| last_error: str | None = None | |
| failure_stage: str | None = None | |
| for attempt in range(1, RETRY_COUNT + 1): | |
| # print(f"[attempt {attempt}/{RETRY_COUNT}] calling model...") | |
| temperature = 0.0 if attempt == 1 else 0.2 | |
| go_code = call_model( | |
| python_source, | |
| prev_go=prev_go, | |
| error_msg=last_error, | |
| temperature=temperature, | |
| ) | |
| GO_SOLUTION.write_text(go_code, encoding="utf-8") | |
| print(f"[attempt {attempt}/{RETRY_COUNT}] wrote: {GO_SOLUTION}") | |
| # print(f"[attempt {attempt}/{RETRY_COUNT}] running go...") | |
| ok_run, run_output = try_compile_and_run(GO_SOLUTION) | |
| if not ok_run: | |
| print(f"[attempt {attempt}/{RETRY_COUNT}] go run failed (compile error or failing tests):") | |
| print(run_output) | |
| prev_go = go_code | |
| last_error = run_output | |
| failure_stage = "build_or_tests" | |
| continue | |
| run_gofmt() | |
| # print(f"[attempt {attempt}/{RETRY_COUNT}] go run succeeded, running parser.py...") | |
| ok_parser, parser_output = try_parser_check() | |
| if ok_parser: | |
| # print(f"[success] go run {GO_SOLUTION} and parser.py both succeeded.") | |
| emit_result("ok") | |
| return | |
| print(f"[attempt {attempt}/{RETRY_COUNT}] parser.py failed (API / structure issue):") | |
| trimmed_output = trim_parser_output(parser_output) | |
| print(trimmed_output) | |
| prev_go = go_code | |
| # Combine both outputs so the model sees everything that went wrong. | |
| last_error = ( | |
| run_output + "\n\n" + "parser.py output:\n" + trimmed_output | |
| ).strip() | |
| failure_stage = "parser" | |
| print("\n[error] Go code still does not compile, pass the tests, or satisfy parser.py.") | |
| if last_error: | |
| print("\nLast `go run` / parser.py output:\n") | |
| print(last_error) | |
| emit_result( | |
| "fail", | |
| failure_stage=failure_stage or "unknown", | |
| error_message=last_error or "Unknown error", | |
| ) | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment