Last active
October 14, 2025 22:41
-
-
Save coderplay/c26329cedec70f54e83cebc3aa618bbc to your computer and use it in GitHub Desktop.
mini version of openai codex, for study
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from __future__ import annotations | |
import json, os, subprocess, sys, tempfile, time, uuid | |
from shlex import quote as shlex_quote | |
from dataclasses import dataclass | |
from typing import Any, Dict, Iterable, List, Optional | |
from urllib import request | |
from urllib.error import HTTPError, URLError | |
API_KEY = "replace-with-your-openai-api-key-here" | |
API_ROOT = "https://api.openai.com/v1" | |
DEFAULT_MODEL = "gpt-5-codex" | |
OUTPUT_LIMIT = 4000 | |
SYSTEM_PROMPT = """You are Codex, an expert software engineering assistant. | |
Follow a disciplined plan-execute-review loop. | |
Never fabricate file contents—inspect before editing. | |
Prefer minimal diffs, inline reasoning, and concise replies. | |
The user works inside a local Git repository; keep commands reproducible.""" | |
def _json_dumps(data: Any) -> str: return json.dumps(data, ensure_ascii=False, separators=(",", ":")) | |
def _now() -> float: return time.perf_counter() | |
def _read_stream(resp: request.addinfourl) -> bytes: | |
try: | |
return resp.read() | |
finally: | |
resp.close() | |
class ResponsesClient: | |
def __init__(self, api_key: str, model: str = DEFAULT_MODEL) -> None: | |
self.api_key = api_key | |
self.model = model | |
def _headers(self) -> Dict[str, str]: | |
return { | |
"Authorization": f"Bearer {self.api_key}", | |
"Content-Type": "application/json", | |
"OpenAI-Beta": "responses=experimental", | |
} | |
def _request(self, method: str, path: str, payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
data = _json_dumps(payload).encode("utf-8") if payload is not None else None | |
req = request.Request(f"{API_ROOT}{path}", data=data, headers=self._headers(), method=method) | |
try: | |
with request.urlopen(req) as resp: | |
body = _read_stream(resp) | |
except HTTPError as err: | |
error_body = err.read().decode("utf-8", errors="ignore") | |
raise RuntimeError(f"OpenAI request failed: {err.code} {err.reason}: {error_body}") from err | |
except URLError as err: | |
raise RuntimeError(f"Failed to reach OpenAI: {err}") from err | |
return json.loads(body.decode("utf-8")) | |
def create(self, history: List[Dict[str, Any]]) -> Dict[str, Any]: | |
payload = { | |
"model": self.model, | |
"instructions": SYSTEM_PROMPT, | |
"input": history, | |
"tools": TOOL_DEFS, | |
"tool_choice": "auto", | |
"parallel_tool_calls": False, | |
"store": False, | |
} | |
return self._request("POST", "/responses", payload) | |
def retrieve(self, response_id: str) -> Dict[str, Any]: | |
return self._request("GET", f"/responses/{response_id}") | |
def _truncate(text: Optional[str], limit: int = OUTPUT_LIMIT) -> str: | |
if not text: | |
return "" if text is None else text | |
return text if len(text) <= limit else text[:limit] + f"\n... [truncated {len(text) - limit} chars]" | |
def make_message(role: str, text: str, kind: str = "input_text") -> Dict[str, Any]: | |
return {"type": "message", "role": role, "content": [{"type": kind, "text": text}]} | |
def make_function_call(call_id: str, name: str, arguments: str) -> Dict[str, Any]: | |
return {"type": "function_call", "call_id": call_id, "name": name, "arguments": arguments} | |
def make_function_output(call_id: str, output: str) -> Dict[str, Any]: | |
return {"type": "function_call_output", "call_id": call_id, "output": output} | |
def collect_output_text(item: Dict[str, Any]) -> str: | |
return "".join(part.get("text", "") for part in item.get("content", []) if isinstance(part, dict)).strip() | |
def ensure_api_key(api_key: str) -> str: | |
if not api_key or "replace" in api_key.lower(): | |
raise SystemExit("Update API_KEY in mini_codex.py with a valid OpenAI key before running.") | |
return api_key | |
@dataclass | |
class ToolResult: | |
output: str | |
success: bool = True | |
def to_api(self, call_id: str) -> Dict[str, Any]: | |
return {"tool_call_id": call_id, "output": self.output, "is_error": not self.success} | |
class MiniCodex: | |
def __init__(self, api_key: str, model: str = DEFAULT_MODEL) -> None: | |
self.client = ResponsesClient(api_key, model=model) | |
self.history: List[Dict[str, Any]] = [] | |
self.workspace = os.getcwd() | |
def run(self) -> None: | |
print("Mini Codex ready. Type 'exit' or 'quit' to stop.") | |
while True: | |
try: | |
raw = input("You> ").strip() | |
except EOFError: | |
print() | |
return | |
except KeyboardInterrupt: | |
print() | |
continue | |
if raw.lower() in {"exit", "quit"}: | |
print("Goodbye.") | |
return | |
if not raw: | |
continue | |
self.history.append(make_message("user", raw)) | |
try: | |
self._complete_turn() | |
except KeyboardInterrupt: | |
print("\n[aborted]") | |
except Exception as exc: | |
print(f"[error] {exc}") | |
def _complete_turn(self) -> None: | |
response = self.client.create(self.history) | |
last_status = None | |
while True: | |
tool_calls = self._gather_tool_calls(response) | |
if tool_calls: | |
response = self._process_tool_calls(response, tool_calls) | |
last_status = None | |
continue | |
status = response.get("status") | |
if status != last_status: | |
print(f"[status] {status}") | |
last_status = status | |
if status == "completed": | |
self._record_and_display_output(response) | |
return | |
if status == "requires_action": | |
response = self._process_tool_calls(response, tool_calls) | |
continue | |
if status in {"in_progress", "queued"}: | |
time.sleep(0.5) | |
response = self.client.retrieve(response["id"]) | |
continue | |
raise RuntimeError(f"Unexpected response status: {status}") | |
def _gather_tool_calls(self, response: Dict[str, Any]) -> List[Dict[str, Any]]: | |
calls: List[Dict[str, Any]] = [] | |
seen: set[str] = set() | |
action = response.get("required_action", {}) or {} | |
submit = action.get("submit_tool_outputs", {}) or {} | |
for call in submit.get("tool_calls", []) or []: | |
normalized = self._normalize_tool_call(call) | |
if normalized and normalized["call_id"] not in seen: | |
seen.add(normalized["call_id"]) | |
calls.append(normalized) | |
for item in response.get("output") or []: | |
if item.get("type") != "function_call": | |
continue | |
normalized = { | |
"type": "function", | |
"function": { | |
"name": item.get("name"), | |
"arguments": item.get("arguments"), | |
}, | |
"id": item.get("id"), | |
"call_id": item.get("call_id"), | |
} | |
normalized = self._normalize_tool_call(normalized) | |
if normalized and normalized["call_id"] not in seen: | |
seen.add(normalized["call_id"]) | |
calls.append(normalized) | |
return calls | |
@staticmethod | |
def _normalize_tool_call(call: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
call_type = call.get("type") | |
if call_type != "function": | |
return None | |
func = call.get("function") or {} | |
name = func.get("name") | |
arguments = func.get("arguments") | |
if name is None or arguments is None: | |
return None | |
call_id = call.get("call_id") or call.get("id") or str(uuid.uuid4()) | |
return {"call_id": call_id, "name": name, "arguments": arguments} | |
def _process_tool_calls( | |
self, response: Dict[str, Any], tool_calls: List[Dict[str, Any]] | |
) -> Dict[str, Any]: | |
outputs: List[Dict[str, Any]] = [] | |
for call in tool_calls: | |
call_id = call["call_id"] | |
name = call["name"] | |
arguments = call["arguments"] or "{}" | |
self.history.append(make_function_call(call_id, name, arguments)) | |
try: | |
parsed = json.loads(arguments) | |
except json.JSONDecodeError as err: | |
result = ToolResult(_json_dumps({"error": f"invalid arguments: {err}"}), success=False) | |
else: | |
result = self._dispatch_tool(name, parsed) | |
self.history.append(make_function_output(call_id, result.output)) | |
outputs.append(result.to_api(call_id)) | |
if outputs: | |
return self.client.create(self.history) | |
return response | |
def _dispatch_tool(self, name: str, arguments: Dict[str, Any]) -> ToolResult: | |
try: | |
if name == "update_plan": | |
return self._tool_plan(arguments) | |
if name == "unified_exec": | |
return self._tool_unified_exec(arguments) | |
if name == "apply_patch": | |
return self._tool_apply_patch(arguments) | |
except Exception as exc: | |
payload = {"error": str(exc)} | |
return ToolResult(_json_dumps(payload), success=False) | |
payload = {"error": f"unknown tool '{name}'"} | |
return ToolResult(_json_dumps(payload), success=False) | |
def _record_and_display_output(self, response: Dict[str, Any]) -> None: | |
outputs = response.get("output") or [] | |
messages: List[str] = [] | |
for item in outputs: | |
if item.get("type") != "message": | |
continue | |
text = collect_output_text(item) | |
if not text: | |
continue | |
messages.append(text) | |
if not messages: | |
text_chunks = [c for c in (response.get("output_text") or []) if isinstance(c, str) and c.strip()] | |
if text_chunks: | |
messages.append("\n".join(text_chunks)) | |
for text in messages: | |
self.history.append(make_message("assistant", text, kind="output_text")) | |
if messages: | |
print("Assistant>") | |
for block in messages: | |
for line in block.splitlines(): | |
print(f" {line}") | |
else: | |
print("[no assistant message]") | |
def _tool_plan(self, arguments: Dict[str, Any]) -> ToolResult: | |
plan = arguments.get("plan") | |
if not isinstance(plan, list): | |
raise ValueError("update_plan.plan must be a list") | |
for entry in plan: | |
if not isinstance(entry, dict) or "step" not in entry or "status" not in entry: | |
raise ValueError("Each plan item must include 'step' and 'status'") | |
explanation = arguments.get("explanation", "") | |
print("Plan>") | |
for idx, item in enumerate(plan, start=1): | |
print(f" {idx}. [{item['status']}] {item['step']}") | |
if explanation: | |
print(f" note: {explanation}") | |
payload = {"plan": plan, "explanation": explanation} | |
return ToolResult(_json_dumps(payload)) | |
def _tool_unified_exec(self, arguments: Dict[str, Any]) -> ToolResult: | |
cmd_input = arguments.get("input") | |
if isinstance(cmd_input, list): | |
command = [str(part) for part in cmd_input if part is not None] | |
elif isinstance(cmd_input, str): | |
command = [cmd_input] | |
else: | |
command = [] | |
if not command: | |
raise ValueError("unified_exec.input must be a non-empty array of strings") | |
timeout = arguments.get("timeout_ms") | |
timeout_s: Optional[float] = None | |
if timeout is not None: | |
try: | |
timeout_s = float(timeout) / 1000.0 | |
except (TypeError, ValueError): | |
raise ValueError("timeout_ms must be numeric") | |
cwd = arguments.get("cwd") or self.workspace | |
if arguments.get("session_id"): | |
return ToolResult(_json_dumps({"error": "session reuse is not supported in mini_codex"}), success=False) | |
argv = command | |
stdin_data = None | |
started = _now() | |
try: | |
proc = subprocess.run( | |
argv, | |
input=stdin_data, | |
capture_output=True, | |
text=True, | |
cwd=cwd, | |
timeout=timeout_s, | |
) | |
except subprocess.TimeoutExpired as err: | |
duration = _now() - started | |
payload = { | |
"stdout": _truncate(err.stdout or ""), | |
"stderr": _truncate(err.stderr or ""), | |
"timeout": timeout_s, | |
"duration_seconds": duration, | |
"command": argv, | |
"cwd": cwd, | |
"exit_code": None, | |
"timed_out": True, | |
} | |
return ToolResult(_json_dumps(payload), success=False) | |
except FileNotFoundError: | |
raise FileNotFoundError(f"Executable not found: {argv[0]}") | |
duration = _now() - started | |
stdout = _truncate(proc.stdout) | |
stderr = _truncate(proc.stderr) | |
payload = { | |
"stdout": stdout, | |
"stderr": stderr, | |
"exit_code": proc.returncode, | |
"duration_seconds": duration, | |
"command": argv, | |
"cwd": cwd, | |
"timed_out": False, | |
} | |
cmd_display = " ".join(shlex_quote(part) for part in argv) | |
print(f"Exec> {cmd_display}") | |
print(f" exit: {proc.returncode} | time: {duration:.2f}s") | |
if stdout: | |
print(" stdout:\n " + "\n ".join(stdout.rstrip().splitlines())) | |
if stderr: | |
print(" stderr:\n " + "\n ".join(stderr.rstrip().splitlines())) | |
return ToolResult(_json_dumps(payload), success=proc.returncode == 0) | |
def _tool_apply_patch(self, arguments: Dict[str, Any]) -> ToolResult: | |
patch_text = ( | |
arguments.get("patch") | |
or arguments.get("input") | |
or arguments.get("diff") | |
) | |
if not patch_text: | |
raise ValueError("apply_patch requires 'patch' text") | |
cwd = arguments.get("cwd") or self.workspace | |
create = arguments.get("create_missing_dirs", True) | |
with tempfile.NamedTemporaryFile("w", delete=False) as temp: | |
temp.write(patch_text) | |
patch_path = temp.name | |
try: | |
if create: | |
self._ensure_directories_from_patch(patch_text, cwd) | |
result = subprocess.run( | |
["git", "apply", "--allow-empty", patch_path], | |
capture_output=True, | |
text=True, | |
cwd=cwd, | |
) | |
success = result.returncode == 0 | |
stdout = _truncate(result.stdout) | |
stderr = _truncate(result.stderr) | |
payload = { | |
"exit_code": result.returncode, | |
"stdout": stdout, | |
"stderr": stderr, | |
"cwd": cwd, | |
} | |
print("ApplyPatch>\n " + "\n ".join(patch_text.rstrip().splitlines())) | |
if stdout.strip(): | |
print(" stdout:\n " + "\n ".join(stdout.rstrip().splitlines())) | |
if stderr.strip(): | |
print(" stderr:\n " + "\n ".join(stderr.rstrip().splitlines())) | |
return ToolResult(_json_dumps(payload), success=success) | |
finally: | |
try: | |
os.remove(patch_path) | |
except OSError: | |
pass | |
@staticmethod | |
def _ensure_directories_from_patch(patch: str, cwd: str) -> None: | |
for line in patch.splitlines(): | |
if line.startswith("+++ ") or line.startswith("--- "): | |
path = line[4:].strip() | |
if path in {"a/dev/null", "b/dev/null"}: | |
continue | |
if path.startswith("a/") or path.startswith("b/"): | |
path = path[2:] | |
directory = os.path.dirname(path) | |
if directory: | |
os.makedirs(os.path.join(cwd, directory), exist_ok=True) | |
def _function_tool(name: str, description: str, parameters: Dict[str, Any], strict: bool = False) -> Dict[str, Any]: | |
return { | |
"type": "function", | |
"name": name, | |
"description": description, | |
"strict": strict, | |
"parameters": parameters, | |
} | |
TOOL_DEFS: List[Dict[str, Any]] = [ | |
_function_tool( | |
"update_plan", | |
"Updates the shared plan for the current task.", | |
{ | |
"type": "object", | |
"properties": { | |
"plan": { | |
"type": "array", | |
"items": { | |
"type": "object", | |
"properties": { | |
"step": {"type": "string"}, | |
"status": {"type": "string"}, | |
}, | |
"required": ["step", "status"], | |
"additionalProperties": False, | |
}, | |
}, | |
"explanation": {"type": "string"}, | |
}, | |
"required": ["plan"], | |
"additionalProperties": False, | |
}, | |
), | |
_function_tool( | |
"unified_exec", | |
"Run a command in a sandboxed shell; reuse session_id to send follow-up input.", | |
{ | |
"type": "object", | |
"properties": { | |
"input": { | |
"type": "array", | |
"items": {"type": "string"}, | |
}, | |
"session_id": {"type": "string"}, | |
"timeout_ms": {"type": "number"}, | |
}, | |
"required": ["input"], | |
"additionalProperties": False, | |
}, | |
), | |
_function_tool( | |
"apply_patch", | |
"Use the apply_patch command to edit files using the Codex patch grammar.", | |
{ | |
"type": "object", | |
"properties": { | |
"input": {"type": "string", "description": "Complete apply_patch payload."}, | |
"cwd": {"type": "string"}, | |
"create_missing_dirs": {"type": "boolean"}, | |
}, | |
"required": ["input"], | |
"additionalProperties": False, | |
}, | |
), | |
] | |
def main(argv: Iterable[str]) -> int: | |
try: | |
api_key = ensure_api_key(API_KEY) | |
codex = MiniCodex(api_key) | |
codex.run() | |
return 0 | |
except SystemExit as exc: | |
raise | |
except KeyboardInterrupt: | |
print() | |
return 0 | |
except Exception as exc: | |
print(f"fatal: {exc}", file=sys.stderr) | |
return 1 | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment