Last active
November 5, 2023 00:38
-
-
Save ichard26/0259d85dd4cc65f1c51261ac512b93e2 to your computer and use it in GitHub Desktop.
Utility program to compile scripts with mypyc and optionally run them. The generated C and IR can be opened in an editor too. Comes with progress reporting.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import time | |
import shutil | |
import socket | |
import subprocess | |
import sys | |
import threading | |
from contextlib import contextmanager | |
from pathlib import Path | |
from typing import List, Iterator | |
import click | |
import msgsock | |
THIS_DIR = Path(__file__).parent | |
MYPYC_DIR = THIS_DIR / "mypyc" | |
INDENT = " " * 4 | |
def find_line(text: str, lines: List[str]) -> int: | |
for n, line in enumerate(lines): | |
if text in line: | |
return n | |
raise ValueError(f"cannot find: {text}") | |
def report_message(message: str = "", *, ansi: str = "") -> None: | |
spaces = " " * 30 | |
if ansi: | |
print(f"{ansi}{message}{spaces}\033[0m\r", end="", flush=True) | |
else: | |
print(f"{message}{spaces}\r", end="", flush=True) | |
@contextmanager | |
def patch_mypyc(*, inject_progress_reporting: bool) -> Iterator[None]: | |
file = Path(MYPYC_DIR, "build.py") | |
original = file.read_text("utf-8") | |
# Enable separate compilation as it's *much* faster. | |
modified = original.replace("compiler_options.separate", "True") | |
lines = modified.splitlines() | |
if inject_progress_reporting: | |
create_msgsock_lines = [ | |
INDENT + "import socket", | |
INDENT + "from msgsock import MessageSocket", | |
INDENT + "sock = socket.create_connection(('localhost', 7612))", | |
INDENT + "conn = MessageSocket(sock)", | |
] | |
# Inject writes to a message socket so stage progress can be communicated. | |
# .. but first we have to find the right lines to inject code next to though. | |
parsing_line_no = find_line("emitmodule.parse_and_typecheck", lines) | |
transforming_line_no = find_line("Parsed and typechecked", lines) | |
compiling_line_no = find_line("Compiled to C", lines) | |
assert 0 < parsing_line_no < transforming_line_no < compiling_line_no | |
lines = [ | |
*lines[:parsing_line_no - 1], | |
*create_msgsock_lines, | |
*lines[parsing_line_no - 1:] | |
] | |
# Tweak line numbers after inserting the message socket creation shim. | |
parsing_line_no += len(create_msgsock_lines) | |
transforming_line_no += len(create_msgsock_lines) | |
compiling_line_no += len(create_msgsock_lines) | |
lines.insert(parsing_line_no - 1, f"{INDENT}conn.send_message('\033[0;33m|[1/3] Parsing and typechecking')") | |
lines.insert(transforming_line_no + 2, INDENT + "conn.send_message('\033[0;35m|[2/3] Building IR and writing C')") | |
lines.insert(compiling_line_no + 1, INDENT + "conn.send_message('\033[0;36m|[3/3] Running C compiler')") | |
lines.insert(compiling_line_no + 2, INDENT + "conn.send_message('LAST-MSG')") | |
lines.insert(compiling_line_no + 3, INDENT + "sock.close()") | |
file.write_text("\n".join(lines) + "\n", "utf-8") | |
try: | |
yield | |
finally: | |
file.write_text(original, "utf-8") | |
@contextmanager | |
def message_passthrough_server() -> Iterator[None]: | |
sock_server = socket.create_server(("localhost", 7612)) | |
def server_main() -> None: | |
sock, _ = sock_server.accept() | |
conn = msgsock.MessageSocket(sock) | |
while (message := conn.receive_message()) != "LAST-MSG": | |
ansi, text = message.split("|") | |
report_message(text, ansi=ansi) | |
sock.close() | |
thread = threading.Thread(target=server_main) | |
thread.start() | |
try: | |
yield | |
finally: | |
thread.join() | |
@click.command | |
@click.argument("src", type=click.Path(exists=True, path_type=Path)) | |
@click.option("-r/-R", "--run/--no-run", help="Run (import) extension after compilation.") | |
@click.option("-i", "--interactive", is_flag=True, help="Run extension with the --interactive flag.") | |
@click.option("-C", "--inspect-c", "inspect_output", flag_value="c", help="Open generated C in an editor.") | |
@click.option("-IR", "--inspect-ir", "inspect_output", flag_value="ir", help="Open generated IR in an editor.") | |
@click.option("-v", "--verbose", is_flag=True) | |
@click.option("-d", "--passthrough", is_flag=True, help="Disable compilation output capturing (to allow PDB to work).") | |
def main( | |
src: Path, | |
run: bool, | |
interactive: bool, | |
inspect_output: str, | |
verbose: bool, | |
passthrough: bool | |
) -> None: | |
"""mypyc CLI shim.""" | |
start_time = time.monotonic() | |
if (build_dir := (Path.cwd() / "build")).exists(): | |
pass | |
else: | |
build_dir = THIS_DIR / "build" | |
try: | |
shutil.rmtree(build_dir) | |
except FileNotFoundError: | |
pass | |
with patch_mypyc(inject_progress_reporting=not passthrough): | |
if passthrough: | |
proc = subprocess.run( | |
[sys.executable, "-m", "mypyc", str(src)], | |
env={**os.environ, "MYPYC_OPT_LEVEL": "0"} | |
) | |
else: | |
with message_passthrough_server(): | |
proc = subprocess.run( | |
[sys.executable, "-m", "mypyc", str(src)], | |
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8", | |
env={**os.environ, "MYPYC_OPT_LEVEL": "0"} | |
) | |
report_message() | |
if proc.returncode or verbose: | |
print(proc.stdout.strip()) | |
if proc.returncode: | |
sys.exit(proc.returncode) | |
if run: | |
cmd = [sys.executable, "-c", f"import {src.stem}"] | |
if interactive: | |
cmd.insert(1, "-i") | |
proc = subprocess.run(cmd) | |
c_file = Path.cwd() / "build" / "__native.c" | |
lines = len(c_file.read_text().splitlines()) | |
elapsed = time.monotonic() - start_time | |
click.secho( | |
f"[{src.name} in {elapsed:.2f}s: ({lines}) return code {proc.returncode}]", | |
dim=True | |
) | |
if inspect_output: | |
editor = os.getenv("EDITOR") or "/usr/bin/vim" | |
if inspect_output == "c": | |
target = Path.cwd() / "build" / "__native.c" | |
elif inspect_output == "ir": | |
target = Path.cwd() / "build" / "ops.txt" | |
subprocess.run([editor, target]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment