Skip to content

Instantly share code, notes, and snippets.

@coderplay
Last active October 14, 2025 22:41
Show Gist options
  • Save coderplay/c26329cedec70f54e83cebc3aa618bbc to your computer and use it in GitHub Desktop.
Save coderplay/c26329cedec70f54e83cebc3aa618bbc to your computer and use it in GitHub Desktop.
mini version of openai codex, for study
#!/usr/bin/env python3
from __future__ import annotations
import json, os, subprocess, sys, tempfile, time, uuid
from shlex import quote as shlex_quote
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional
from urllib import request
from urllib.error import HTTPError, URLError
API_KEY = "replace-with-your-openai-api-key-here"
API_ROOT = "https://api.openai.com/v1"
DEFAULT_MODEL = "gpt-5-codex"
OUTPUT_LIMIT = 4000
SYSTEM_PROMPT = """You are Codex, an expert software engineering assistant.
Follow a disciplined plan-execute-review loop.
Never fabricate file contents—inspect before editing.
Prefer minimal diffs, inline reasoning, and concise replies.
The user works inside a local Git repository; keep commands reproducible."""
def _json_dumps(data: Any) -> str: return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
def _now() -> float: return time.perf_counter()
def _read_stream(resp: request.addinfourl) -> bytes:
try:
return resp.read()
finally:
resp.close()
class ResponsesClient:
def __init__(self, api_key: str, model: str = DEFAULT_MODEL) -> None:
self.api_key = api_key
self.model = model
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"OpenAI-Beta": "responses=experimental",
}
def _request(self, method: str, path: str, payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
data = _json_dumps(payload).encode("utf-8") if payload is not None else None
req = request.Request(f"{API_ROOT}{path}", data=data, headers=self._headers(), method=method)
try:
with request.urlopen(req) as resp:
body = _read_stream(resp)
except HTTPError as err:
error_body = err.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"OpenAI request failed: {err.code} {err.reason}: {error_body}") from err
except URLError as err:
raise RuntimeError(f"Failed to reach OpenAI: {err}") from err
return json.loads(body.decode("utf-8"))
def create(self, history: List[Dict[str, Any]]) -> Dict[str, Any]:
payload = {
"model": self.model,
"instructions": SYSTEM_PROMPT,
"input": history,
"tools": TOOL_DEFS,
"tool_choice": "auto",
"parallel_tool_calls": False,
"store": False,
}
return self._request("POST", "/responses", payload)
def retrieve(self, response_id: str) -> Dict[str, Any]:
return self._request("GET", f"/responses/{response_id}")
def _truncate(text: Optional[str], limit: int = OUTPUT_LIMIT) -> str:
if not text:
return "" if text is None else text
return text if len(text) <= limit else text[:limit] + f"\n... [truncated {len(text) - limit} chars]"
def make_message(role: str, text: str, kind: str = "input_text") -> Dict[str, Any]:
return {"type": "message", "role": role, "content": [{"type": kind, "text": text}]}
def make_function_call(call_id: str, name: str, arguments: str) -> Dict[str, Any]:
return {"type": "function_call", "call_id": call_id, "name": name, "arguments": arguments}
def make_function_output(call_id: str, output: str) -> Dict[str, Any]:
return {"type": "function_call_output", "call_id": call_id, "output": output}
def collect_output_text(item: Dict[str, Any]) -> str:
return "".join(part.get("text", "") for part in item.get("content", []) if isinstance(part, dict)).strip()
def ensure_api_key(api_key: str) -> str:
if not api_key or "replace" in api_key.lower():
raise SystemExit("Update API_KEY in mini_codex.py with a valid OpenAI key before running.")
return api_key
@dataclass
class ToolResult:
output: str
success: bool = True
def to_api(self, call_id: str) -> Dict[str, Any]:
return {"tool_call_id": call_id, "output": self.output, "is_error": not self.success}
class MiniCodex:
def __init__(self, api_key: str, model: str = DEFAULT_MODEL) -> None:
self.client = ResponsesClient(api_key, model=model)
self.history: List[Dict[str, Any]] = []
self.workspace = os.getcwd()
def run(self) -> None:
print("Mini Codex ready. Type 'exit' or 'quit' to stop.")
while True:
try:
raw = input("You> ").strip()
except EOFError:
print()
return
except KeyboardInterrupt:
print()
continue
if raw.lower() in {"exit", "quit"}:
print("Goodbye.")
return
if not raw:
continue
self.history.append(make_message("user", raw))
try:
self._complete_turn()
except KeyboardInterrupt:
print("\n[aborted]")
except Exception as exc:
print(f"[error] {exc}")
def _complete_turn(self) -> None:
response = self.client.create(self.history)
last_status = None
while True:
tool_calls = self._gather_tool_calls(response)
if tool_calls:
response = self._process_tool_calls(response, tool_calls)
last_status = None
continue
status = response.get("status")
if status != last_status:
print(f"[status] {status}")
last_status = status
if status == "completed":
self._record_and_display_output(response)
return
if status == "requires_action":
response = self._process_tool_calls(response, tool_calls)
continue
if status in {"in_progress", "queued"}:
time.sleep(0.5)
response = self.client.retrieve(response["id"])
continue
raise RuntimeError(f"Unexpected response status: {status}")
def _gather_tool_calls(self, response: Dict[str, Any]) -> List[Dict[str, Any]]:
calls: List[Dict[str, Any]] = []
seen: set[str] = set()
action = response.get("required_action", {}) or {}
submit = action.get("submit_tool_outputs", {}) or {}
for call in submit.get("tool_calls", []) or []:
normalized = self._normalize_tool_call(call)
if normalized and normalized["call_id"] not in seen:
seen.add(normalized["call_id"])
calls.append(normalized)
for item in response.get("output") or []:
if item.get("type") != "function_call":
continue
normalized = {
"type": "function",
"function": {
"name": item.get("name"),
"arguments": item.get("arguments"),
},
"id": item.get("id"),
"call_id": item.get("call_id"),
}
normalized = self._normalize_tool_call(normalized)
if normalized and normalized["call_id"] not in seen:
seen.add(normalized["call_id"])
calls.append(normalized)
return calls
@staticmethod
def _normalize_tool_call(call: Dict[str, Any]) -> Optional[Dict[str, Any]]:
call_type = call.get("type")
if call_type != "function":
return None
func = call.get("function") or {}
name = func.get("name")
arguments = func.get("arguments")
if name is None or arguments is None:
return None
call_id = call.get("call_id") or call.get("id") or str(uuid.uuid4())
return {"call_id": call_id, "name": name, "arguments": arguments}
def _process_tool_calls(
self, response: Dict[str, Any], tool_calls: List[Dict[str, Any]]
) -> Dict[str, Any]:
outputs: List[Dict[str, Any]] = []
for call in tool_calls:
call_id = call["call_id"]
name = call["name"]
arguments = call["arguments"] or "{}"
self.history.append(make_function_call(call_id, name, arguments))
try:
parsed = json.loads(arguments)
except json.JSONDecodeError as err:
result = ToolResult(_json_dumps({"error": f"invalid arguments: {err}"}), success=False)
else:
result = self._dispatch_tool(name, parsed)
self.history.append(make_function_output(call_id, result.output))
outputs.append(result.to_api(call_id))
if outputs:
return self.client.create(self.history)
return response
def _dispatch_tool(self, name: str, arguments: Dict[str, Any]) -> ToolResult:
try:
if name == "update_plan":
return self._tool_plan(arguments)
if name == "unified_exec":
return self._tool_unified_exec(arguments)
if name == "apply_patch":
return self._tool_apply_patch(arguments)
except Exception as exc:
payload = {"error": str(exc)}
return ToolResult(_json_dumps(payload), success=False)
payload = {"error": f"unknown tool '{name}'"}
return ToolResult(_json_dumps(payload), success=False)
def _record_and_display_output(self, response: Dict[str, Any]) -> None:
outputs = response.get("output") or []
messages: List[str] = []
for item in outputs:
if item.get("type") != "message":
continue
text = collect_output_text(item)
if not text:
continue
messages.append(text)
if not messages:
text_chunks = [c for c in (response.get("output_text") or []) if isinstance(c, str) and c.strip()]
if text_chunks:
messages.append("\n".join(text_chunks))
for text in messages:
self.history.append(make_message("assistant", text, kind="output_text"))
if messages:
print("Assistant>")
for block in messages:
for line in block.splitlines():
print(f" {line}")
else:
print("[no assistant message]")
def _tool_plan(self, arguments: Dict[str, Any]) -> ToolResult:
plan = arguments.get("plan")
if not isinstance(plan, list):
raise ValueError("update_plan.plan must be a list")
for entry in plan:
if not isinstance(entry, dict) or "step" not in entry or "status" not in entry:
raise ValueError("Each plan item must include 'step' and 'status'")
explanation = arguments.get("explanation", "")
print("Plan>")
for idx, item in enumerate(plan, start=1):
print(f" {idx}. [{item['status']}] {item['step']}")
if explanation:
print(f" note: {explanation}")
payload = {"plan": plan, "explanation": explanation}
return ToolResult(_json_dumps(payload))
def _tool_unified_exec(self, arguments: Dict[str, Any]) -> ToolResult:
cmd_input = arguments.get("input")
if isinstance(cmd_input, list):
command = [str(part) for part in cmd_input if part is not None]
elif isinstance(cmd_input, str):
command = [cmd_input]
else:
command = []
if not command:
raise ValueError("unified_exec.input must be a non-empty array of strings")
timeout = arguments.get("timeout_ms")
timeout_s: Optional[float] = None
if timeout is not None:
try:
timeout_s = float(timeout) / 1000.0
except (TypeError, ValueError):
raise ValueError("timeout_ms must be numeric")
cwd = arguments.get("cwd") or self.workspace
if arguments.get("session_id"):
return ToolResult(_json_dumps({"error": "session reuse is not supported in mini_codex"}), success=False)
argv = command
stdin_data = None
started = _now()
try:
proc = subprocess.run(
argv,
input=stdin_data,
capture_output=True,
text=True,
cwd=cwd,
timeout=timeout_s,
)
except subprocess.TimeoutExpired as err:
duration = _now() - started
payload = {
"stdout": _truncate(err.stdout or ""),
"stderr": _truncate(err.stderr or ""),
"timeout": timeout_s,
"duration_seconds": duration,
"command": argv,
"cwd": cwd,
"exit_code": None,
"timed_out": True,
}
return ToolResult(_json_dumps(payload), success=False)
except FileNotFoundError:
raise FileNotFoundError(f"Executable not found: {argv[0]}")
duration = _now() - started
stdout = _truncate(proc.stdout)
stderr = _truncate(proc.stderr)
payload = {
"stdout": stdout,
"stderr": stderr,
"exit_code": proc.returncode,
"duration_seconds": duration,
"command": argv,
"cwd": cwd,
"timed_out": False,
}
cmd_display = " ".join(shlex_quote(part) for part in argv)
print(f"Exec> {cmd_display}")
print(f" exit: {proc.returncode} | time: {duration:.2f}s")
if stdout:
print(" stdout:\n " + "\n ".join(stdout.rstrip().splitlines()))
if stderr:
print(" stderr:\n " + "\n ".join(stderr.rstrip().splitlines()))
return ToolResult(_json_dumps(payload), success=proc.returncode == 0)
def _tool_apply_patch(self, arguments: Dict[str, Any]) -> ToolResult:
patch_text = (
arguments.get("patch")
or arguments.get("input")
or arguments.get("diff")
)
if not patch_text:
raise ValueError("apply_patch requires 'patch' text")
cwd = arguments.get("cwd") or self.workspace
create = arguments.get("create_missing_dirs", True)
with tempfile.NamedTemporaryFile("w", delete=False) as temp:
temp.write(patch_text)
patch_path = temp.name
try:
if create:
self._ensure_directories_from_patch(patch_text, cwd)
result = subprocess.run(
["git", "apply", "--allow-empty", patch_path],
capture_output=True,
text=True,
cwd=cwd,
)
success = result.returncode == 0
stdout = _truncate(result.stdout)
stderr = _truncate(result.stderr)
payload = {
"exit_code": result.returncode,
"stdout": stdout,
"stderr": stderr,
"cwd": cwd,
}
print("ApplyPatch>\n " + "\n ".join(patch_text.rstrip().splitlines()))
if stdout.strip():
print(" stdout:\n " + "\n ".join(stdout.rstrip().splitlines()))
if stderr.strip():
print(" stderr:\n " + "\n ".join(stderr.rstrip().splitlines()))
return ToolResult(_json_dumps(payload), success=success)
finally:
try:
os.remove(patch_path)
except OSError:
pass
@staticmethod
def _ensure_directories_from_patch(patch: str, cwd: str) -> None:
for line in patch.splitlines():
if line.startswith("+++ ") or line.startswith("--- "):
path = line[4:].strip()
if path in {"a/dev/null", "b/dev/null"}:
continue
if path.startswith("a/") or path.startswith("b/"):
path = path[2:]
directory = os.path.dirname(path)
if directory:
os.makedirs(os.path.join(cwd, directory), exist_ok=True)
def _function_tool(name: str, description: str, parameters: Dict[str, Any], strict: bool = False) -> Dict[str, Any]:
return {
"type": "function",
"name": name,
"description": description,
"strict": strict,
"parameters": parameters,
}
TOOL_DEFS: List[Dict[str, Any]] = [
_function_tool(
"update_plan",
"Updates the shared plan for the current task.",
{
"type": "object",
"properties": {
"plan": {
"type": "array",
"items": {
"type": "object",
"properties": {
"step": {"type": "string"},
"status": {"type": "string"},
},
"required": ["step", "status"],
"additionalProperties": False,
},
},
"explanation": {"type": "string"},
},
"required": ["plan"],
"additionalProperties": False,
},
),
_function_tool(
"unified_exec",
"Run a command in a sandboxed shell; reuse session_id to send follow-up input.",
{
"type": "object",
"properties": {
"input": {
"type": "array",
"items": {"type": "string"},
},
"session_id": {"type": "string"},
"timeout_ms": {"type": "number"},
},
"required": ["input"],
"additionalProperties": False,
},
),
_function_tool(
"apply_patch",
"Use the apply_patch command to edit files using the Codex patch grammar.",
{
"type": "object",
"properties": {
"input": {"type": "string", "description": "Complete apply_patch payload."},
"cwd": {"type": "string"},
"create_missing_dirs": {"type": "boolean"},
},
"required": ["input"],
"additionalProperties": False,
},
),
]
def main(argv: Iterable[str]) -> int:
try:
api_key = ensure_api_key(API_KEY)
codex = MiniCodex(api_key)
codex.run()
return 0
except SystemExit as exc:
raise
except KeyboardInterrupt:
print()
return 0
except Exception as exc:
print(f"fatal: {exc}", file=sys.stderr)
return 1
if __name__ == "__main__":
sys.exit(main(sys.argv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment