Last active
April 20, 2026 15:29
-
-
Save samuelcolvin/bae1f0017137239396283270325ea19c to your computer and use it in GitHub Desktop.
CLI for hackmonty.com
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # requires-python = ">=3.14" | |
| # dependencies = ["httpx", "pydantic", "rich"] | |
| # /// | |
| """CLI for https://hackmonty.com. | |
| Submits a local Python file to `/run/` and drives the pause/resume loop end to | |
| end. A second local Python file may supply module-level names used to resolve | |
| external function calls and name lookups the sandbox hands back to the client. | |
| NOTE: This script was almost entirely written by claude - blame uncle dario. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import hashlib | |
| import importlib.util | |
| import inspect | |
| import subprocess | |
| import sys | |
| import tempfile | |
| import time | |
| from pathlib import Path | |
| from types import ModuleType | |
| from typing import Annotated, Any, Literal, TypeAlias, cast | |
| from uuid import UUID | |
| import httpx | |
| from pydantic import BaseModel, BeforeValidator, Field, TypeAdapter | |
| from rich.console import Console | |
| from rich.pretty import Pretty | |
| async def main( | |
| code: str, | |
| globals: ModuleType, | |
| *, | |
| base_url: str = 'https://hackmonty.com', | |
| console: Console | None = None, | |
| user_secret: str | None = None, | |
| type_check: bool = False, | |
| type_check_stubs: str | None = None, | |
| ) -> Result: | |
| """Run `code` against hackmonty, resolving snapshots from `globals`. | |
| If `console` is provided, progress is printed to it for every external | |
| function call and name lookup resolved by the client. If `user_secret` | |
| is provided, its SHA-256 hex digest is sent as a `User` header on every | |
| request so the Pydantic team can later verify it was you who found the | |
| secret — see the "Request secret" section of `instructions.md`. | |
| If `type_check` is true the server runs pyright over `code` using | |
| `type_check_stubs` (generated by `stubgen` from the globals file). | |
| """ | |
| headers = {'User': hashlib.sha256(user_secret.encode()).hexdigest()} if user_secret is not None else None | |
| async with httpx.AsyncClient(timeout=None, headers=headers) as client: | |
| result, _ = await _dispatch(client, base_url, code, globals, console, type_check, type_check_stubs) | |
| return result | |
| async def _main_with_final_dt( | |
| code: str, | |
| globals: ModuleType, | |
| *, | |
| base_url: str, | |
| console: Console | None, | |
| user_secret: str | None, | |
| type_check: bool, | |
| type_check_stubs: str | None, | |
| ) -> tuple[Result, float]: | |
| headers = {'User': hashlib.sha256(user_secret.encode()).hexdigest()} if user_secret is not None else None | |
| async with httpx.AsyncClient(timeout=None, headers=headers) as client: | |
| return await _dispatch(client, base_url, code, globals, console, type_check, type_check_stubs) | |
| def cli() -> int: | |
| parser = argparse.ArgumentParser(description='Run a Python script against https://hackmonty.com.') | |
| source = parser.add_mutually_exclusive_group(required=True) | |
| source.add_argument('-c', '--code', help="Python source to run, e.g. --code '1 + 1'.") | |
| source.add_argument('-s', '--script', help='Path to a Python file to submit to /run/.') | |
| parser.add_argument( | |
| '-g', | |
| '--globals', | |
| dest='globals_path', | |
| default=None, | |
| help='Optional Python file whose module-level names resolve sandbox lookups.', | |
| ) | |
| parser.add_argument('--base-url', default='https://hackmonty.com', help='Hackmonty base URL.') | |
| parser.add_argument( | |
| '--user-secret', | |
| dest='user_secret', | |
| default=None, | |
| help='Plaintext secret known only to you; its SHA-256 is sent as a User header so Pydantic can verify the finder.', | |
| ) | |
| parser.add_argument( | |
| '-t', | |
| '--type-check', | |
| dest='type_check', | |
| action='store_true', | |
| help='Ask the server to type-check the submitted code. When combined with --globals, ' | |
| 'stubgen runs over the globals file and the resulting .pyi is sent as type_check_stubs.', | |
| ) | |
| args = parser.parse_args() | |
| globals_module = _load_globals(cast(str | None, args.globals_path)) | |
| console = Console() | |
| base_url = cast(str, args.base_url) | |
| user_secret = cast(str | None, args.user_secret) | |
| type_check = cast(bool, args.type_check) | |
| globals_path = cast(str | None, args.globals_path) | |
| type_check_stubs = _generate_stubs(globals_path) if type_check and globals_path else None | |
| code = cast(str | None, args.code) | |
| if code is None: | |
| code = Path(cast(str, args.script)).read_text() | |
| result, dt = asyncio.run( | |
| _main_with_final_dt( | |
| code, | |
| globals_module, | |
| base_url=base_url, | |
| console=console, | |
| user_secret=user_secret, | |
| type_check=type_check, | |
| type_check_stubs=type_check_stubs, | |
| ) | |
| ) | |
| if isinstance(result, Complete): | |
| if result.print_output: | |
| console.print('[bold]stdout:[/bold]') | |
| console.out( | |
| result.print_output, | |
| end='' if result.print_output.endswith('\n') else '\n', | |
| highlight=False, | |
| ) | |
| console.print(f'{_fmt_dt(dt)} [bold]output:[/bold]') | |
| console.print(Pretty(result.output, indent_size=2)) | |
| return 0 | |
| if isinstance(result, RuntimeErrorResult): | |
| if result.print_output: | |
| sys.stdout.write(result.print_output) | |
| if not result.print_output.endswith('\n'): | |
| sys.stdout.write('\n') | |
| console.print(f'{_fmt_dt(dt)} [bold red]runtime error:[/bold red] {result.error}') | |
| if result.traceback: | |
| print(result.traceback, file=sys.stderr) | |
| return 1 | |
| if isinstance(result, SyntaxErrorResult): | |
| console.print(f'{_fmt_dt(dt)} [bold red]syntax error:[/bold red] {result.error}') | |
| return 1 | |
| console.print(f'{_fmt_dt(dt)} [bold red]typing error:[/bold red] {result.error}') | |
| return 1 | |
| def _decode_monty_json(value: Any) -> Any: | |
| """Decode monty's natural-form JSON into native Python values. | |
| Non-JSON-native Python values are wire-encoded as `{"$<tag>": ...}` objects | |
| by `pydantic_monty` — see `MontyComplete.output_json`. | |
| """ | |
| if isinstance(value, list): | |
| items = cast(list[Any], value) | |
| return [_decode_monty_json(v) for v in items] | |
| if not isinstance(value, dict): | |
| return value | |
| obj = cast(dict[str, Any], value) | |
| if '$tuple' in obj: | |
| return tuple(_decode_monty_json(v) for v in cast(list[Any], obj['$tuple'])) | |
| elif '$set' in obj: | |
| return {_decode_monty_json(v) for v in cast(list[Any], obj['$set'])} | |
| elif '$frozenset' in obj: | |
| return frozenset(_decode_monty_json(v) for v in cast(list[Any], obj['$frozenset'])) | |
| elif '$bytes' in obj: | |
| return bytes(cast(list[int], obj['$bytes'])) | |
| elif '$dict' in obj: | |
| pairs = cast(list[list[Any]], obj['$dict']) | |
| return {_decode_monty_json(k): _decode_monty_json(v) for k, v in pairs} | |
| elif '$ellipsis' in obj: | |
| return ... | |
| elif '$float' in obj: | |
| return float(cast(str, obj['$float'])) | |
| elif '$dataclass' in obj: | |
| raise ValueError(f'cannot decode dataclass {obj.get("name")!r}: no class definition available on the client') | |
| else: | |
| return {k: _decode_monty_json(v) for k, v in obj.items()} | |
| MontyEncoded: TypeAlias = Annotated[Any, BeforeValidator(_decode_monty_json)] | |
| class Complete(BaseModel): | |
| kind: Literal['completed'] = 'completed' | |
| output: MontyEncoded | |
| print_output: str | |
| class SyntaxErrorResult(BaseModel): | |
| kind: Literal['syntax_error'] = 'syntax_error' | |
| error: str | |
| class TypingErrorResult(BaseModel): | |
| kind: Literal['typing_error'] = 'typing_error' | |
| error: str | |
| class RuntimeErrorResult(BaseModel): | |
| kind: Literal['runtime_error'] = 'runtime_error' | |
| error: str | |
| traceback: str | None = None | |
| print_output: str | None = None | |
| Result: TypeAlias = Annotated[ | |
| Complete | SyntaxErrorResult | TypingErrorResult | RuntimeErrorResult, | |
| Field(discriminator='kind'), | |
| ] | |
| async def _dispatch( | |
| client: httpx.AsyncClient, | |
| base_url: str, | |
| code: str, | |
| globals_module: ModuleType, | |
| console: Console | None, | |
| type_check: bool, | |
| type_check_stubs: str | None, | |
| ) -> tuple[Result, float]: | |
| tasks: dict[UUID, asyncio.Task[ReturnValue | ExceptionValue]] = {} | |
| try: | |
| progress, dt = await _post_json( | |
| client, | |
| f'{base_url}/run/', | |
| MontyRun(code=code, type_check=type_check, type_check_stubs=type_check_stubs), | |
| ) | |
| while True: | |
| if isinstance(progress, (Complete, SyntaxErrorResult, TypingErrorResult, RuntimeErrorResult)): | |
| return progress, dt | |
| if isinstance(progress, FunctionSnapshotMsg): | |
| progress, dt = await _handle_function(client, base_url, progress, globals_module, tasks, console, dt) | |
| elif isinstance(progress, NameLookupSnapshotMsg): | |
| progress, dt = await _handle_name_lookup(client, base_url, progress, globals_module, console, dt) | |
| else: | |
| progress, dt = await _handle_future(client, base_url, progress, tasks) | |
| finally: | |
| for task in tasks.values(): | |
| task.cancel() | |
| await asyncio.gather(*tasks.values(), return_exceptions=True) | |
| async def _handle_function( | |
| client: httpx.AsyncClient, | |
| base_url: str, | |
| snapshot: FunctionSnapshotMsg, | |
| globals_module: ModuleType, | |
| tasks: dict[UUID, asyncio.Task[ReturnValue | ExceptionValue]], | |
| console: Console | None, | |
| prev_dt: float, | |
| ) -> tuple[_RunResponse, float]: | |
| fn = cast(Any, getattr(globals_module, snapshot.function_name, _MISSING)) | |
| call = _format_call(snapshot.function_name, snapshot.args, snapshot.kwargs) | |
| prefix = _fmt_dt(prev_dt) | |
| result: ExternalWebResult | |
| if fn is _MISSING: | |
| if console is not None: | |
| console.print(f'{prefix} [bold]function call:[/bold] {call} [red]not found[/red]') | |
| result = ExceptionValue( | |
| exception_type='LookupError', | |
| exception_message=f"Unable to find '{snapshot.function_name}' in globals module", | |
| ) | |
| else: | |
| try: | |
| returned = fn(*snapshot.args, **snapshot.kwargs) | |
| except Exception as exc: | |
| if console is not None: | |
| console.print( | |
| f'{prefix} [bold]function call:[/bold] {call} [red]raised {type(exc).__name__}:[/red] {exc}' | |
| ) | |
| result = _as_exception(exc) | |
| else: | |
| if inspect.iscoroutine(returned): | |
| if console is not None: | |
| console.print(f'{prefix} [bold]function call:[/bold] {call} [dim]→ future[/dim]') | |
| tasks[snapshot.snapshot_id] = asyncio.create_task(_await_and_wrap(returned)) | |
| result = 'future' | |
| else: | |
| if console is not None: | |
| console.print(f'{prefix} [bold]function call:[/bold] {call} → {returned!r}') | |
| result = ReturnValue(return_value=returned) | |
| return await _post_json(client, f'{base_url}/run/{snapshot.snapshot_id}/', MontyResumeFunction(result=result)) | |
| async def _handle_name_lookup( | |
| client: httpx.AsyncClient, | |
| base_url: str, | |
| snapshot: NameLookupSnapshotMsg, | |
| globals_module: ModuleType, | |
| console: Console | None, | |
| prev_dt: float, | |
| ) -> tuple[_RunResponse, float]: | |
| value = cast(Any, getattr(globals_module, snapshot.name, _MISSING)) | |
| if console is not None: | |
| prefix = _fmt_dt(prev_dt) | |
| if value is _MISSING: | |
| console.print(f'{prefix} [bold]name lookup:[/bold] {snapshot.name} [red]not found[/red]') | |
| else: | |
| console.print(f'{prefix} [bold]name lookup:[/bold] {snapshot.name} → {value!r}') | |
| resume_value = None if value is _MISSING else ReturnValue(return_value=value) | |
| return await _post_json( | |
| client, | |
| f'{base_url}/run/{snapshot.snapshot_id}/', | |
| MontyResumeNameLookup(value=resume_value), | |
| ) | |
| async def _handle_future( | |
| client: httpx.AsyncClient, | |
| base_url: str, | |
| snapshot: FutureSnapshotMsg, | |
| tasks: dict[UUID, asyncio.Task[ReturnValue | ExceptionValue]], | |
| ) -> tuple[_RunResponse, float]: | |
| pending = [tasks[sid] for sid in snapshot.pending_snapshot_ids if sid in tasks] | |
| if not pending: | |
| raise RuntimeError(f'FutureSnapshot references unknown snapshot_ids: {snapshot.pending_snapshot_ids}') | |
| done, _ = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) | |
| inv = {id(t): sid for sid, t in tasks.items()} | |
| results: dict[UUID, ExternalWebResult] = {} | |
| for task in done: | |
| sid = inv[id(task)] | |
| results[sid] = task.result() | |
| del tasks[sid] | |
| return await _post_json( | |
| client, | |
| f'{base_url}/run/{snapshot.snapshot_id}/', | |
| MontyResumeFuture(results=results), | |
| ) | |
| class FunctionSnapshotMsg(BaseModel): | |
| kind: Literal['function_snapshot'] = 'function_snapshot' | |
| snapshot_id: UUID | |
| function_name: str | |
| args: Annotated[list[Any], BeforeValidator(_decode_monty_json)] | |
| kwargs: Annotated[dict[str, Any], BeforeValidator(_decode_monty_json)] | |
| print_output: str | |
| class NameLookupSnapshotMsg(BaseModel): | |
| kind: Literal['name_lookup_snapshot'] = 'name_lookup_snapshot' | |
| snapshot_id: UUID | |
| name: str | |
| print_output: str | |
| class FutureSnapshotMsg(BaseModel): | |
| kind: Literal['future_snapshot'] = 'future_snapshot' | |
| snapshot_id: UUID | |
| pending_snapshot_ids: list[UUID] | |
| print_output: str | |
| _RunResponse: TypeAlias = Annotated[ | |
| Complete | |
| | FunctionSnapshotMsg | |
| | NameLookupSnapshotMsg | |
| | FutureSnapshotMsg | |
| | SyntaxErrorResult | |
| | TypingErrorResult | |
| | RuntimeErrorResult, | |
| Field(discriminator='kind'), | |
| ] | |
| _RUN_RESPONSE_ADAPTER: TypeAdapter[_RunResponse] = TypeAdapter(_RunResponse) | |
| class MontyRun(BaseModel): | |
| code: str | |
| inputs: dict[str, Any] | None = None | |
| type_check: bool = False | |
| type_check_stubs: str | None = None | |
| class ReturnValue(BaseModel): | |
| return_value: Any | |
| class ExceptionValue(BaseModel): | |
| exception_type: str | |
| exception_message: str | |
| ExternalWebResult: TypeAlias = ReturnValue | ExceptionValue | Literal['future'] | |
| class MontyResumeFunction(BaseModel): | |
| kind: Literal['function'] = 'function' | |
| result: ExternalWebResult | |
| class MontyResumeNameLookup(BaseModel): | |
| kind: Literal['name_lookup'] = 'name_lookup' | |
| value: ReturnValue | None | |
| class MontyResumeFuture(BaseModel): | |
| kind: Literal['future'] = 'future' | |
| results: dict[UUID, ExternalWebResult] | |
| # --- Helpers ----------------------------------------------------------------- | |
| _MISSING: object = object() | |
| def _format_call(name: str, args: list[Any], kwargs: dict[str, Any]) -> str: | |
| parts = [repr(a) for a in args] + [f'{k}={v!r}' for k, v in kwargs.items()] | |
| return f'{name}({", ".join(parts)})' | |
| def _as_exception(exc: BaseException) -> ExceptionValue: | |
| return ExceptionValue(exception_type=type(exc).__name__, exception_message=str(exc)) | |
| def _generate_stubs(globals_path: str) -> str: | |
| """Run `stubgen` over the globals file and return the resulting .pyi text.""" | |
| print('running stubgen...') | |
| p = Path(globals_path).resolve() | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| subprocess.run( | |
| ['uv', 'run', '--with', 'mypy', 'stubgen', p.name, '--include-docstrings', '-o', tmpdir], | |
| capture_output=True, | |
| text=True, | |
| cwd=p.parent, | |
| check=True, | |
| ) | |
| print('type stubs generated') | |
| return (Path(tmpdir) / f'{p.stem}.pyi').read_text() | |
| def _load_globals(path: str | None) -> ModuleType: | |
| if path is None: | |
| return ModuleType('hackmonty_globals') | |
| spec = importlib.util.spec_from_file_location('hackmonty_globals', path) | |
| if spec is None or spec.loader is None: | |
| raise RuntimeError(f'cannot load globals module from {path}') | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| return module | |
| async def _post_json(client: httpx.AsyncClient, url: str, body: BaseModel) -> tuple[_RunResponse, float]: | |
| t0 = time.monotonic() | |
| r = await client.post(url, json=body.model_dump(mode='json'), timeout=None) | |
| r.raise_for_status() | |
| parsed = _RUN_RESPONSE_ADAPTER.validate_python(r.json()) | |
| return parsed, time.monotonic() - t0 | |
| def _fmt_dt(dt: float) -> str: | |
| return f'[dim]{dt * 1000:>5.0f}ms →[/dim]' | |
| async def _await_and_wrap(coro: Any) -> ReturnValue | ExceptionValue: | |
| try: | |
| value = await coro | |
| except Exception as e: | |
| return _as_exception(e) | |
| return ReturnValue(return_value=value) | |
| if __name__ == '__main__': | |
| raise SystemExit(cli()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment