Skip to content

Instantly share code, notes, and snippets.

@ichard26
Last active November 5, 2023 00:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ichard26/0259d85dd4cc65f1c51261ac512b93e2 to your computer and use it in GitHub Desktop.
Save ichard26/0259d85dd4cc65f1c51261ac512b93e2 to your computer and use it in GitHub Desktop.
Utility program to compile scripts with mypyc and optionally run them. The generated C and IR can be opened in an editor too. Comes with progress reporting.
#!/usr/bin/env python
import os
import time
import shutil
import socket
import subprocess
import sys
import threading
from contextlib import contextmanager
from pathlib import Path
from typing import List, Iterator
import click
import msgsock
THIS_DIR = Path(__file__).parent
MYPYC_DIR = THIS_DIR / "mypyc"
INDENT = " " * 4
def find_line(text: str, lines: List[str]) -> int:
for n, line in enumerate(lines):
if text in line:
return n
raise ValueError(f"cannot find: {text}")
def report_message(message: str = "", *, ansi: str = "") -> None:
spaces = " " * 30
if ansi:
print(f"{ansi}{message}{spaces}\033[0m\r", end="", flush=True)
else:
print(f"{message}{spaces}\r", end="", flush=True)
@contextmanager
def patch_mypyc(*, inject_progress_reporting: bool) -> Iterator[None]:
file = Path(MYPYC_DIR, "build.py")
original = file.read_text("utf-8")
# Enable separate compilation as it's *much* faster.
modified = original.replace("compiler_options.separate", "True")
lines = modified.splitlines()
if inject_progress_reporting:
create_msgsock_lines = [
INDENT + "import socket",
INDENT + "from msgsock import MessageSocket",
INDENT + "sock = socket.create_connection(('localhost', 7612))",
INDENT + "conn = MessageSocket(sock)",
]
# Inject writes to a message socket so stage progress can be communicated.
# .. but first we have to find the right lines to inject code next to though.
parsing_line_no = find_line("emitmodule.parse_and_typecheck", lines)
transforming_line_no = find_line("Parsed and typechecked", lines)
compiling_line_no = find_line("Compiled to C", lines)
assert 0 < parsing_line_no < transforming_line_no < compiling_line_no
lines = [
*lines[:parsing_line_no - 1],
*create_msgsock_lines,
*lines[parsing_line_no - 1:]
]
# Tweak line numbers after inserting the message socket creation shim.
parsing_line_no += len(create_msgsock_lines)
transforming_line_no += len(create_msgsock_lines)
compiling_line_no += len(create_msgsock_lines)
lines.insert(parsing_line_no - 1, f"{INDENT}conn.send_message('\033[0;33m|[1/3] Parsing and typechecking')")
lines.insert(transforming_line_no + 2, INDENT + "conn.send_message('\033[0;35m|[2/3] Building IR and writing C')")
lines.insert(compiling_line_no + 1, INDENT + "conn.send_message('\033[0;36m|[3/3] Running C compiler')")
lines.insert(compiling_line_no + 2, INDENT + "conn.send_message('LAST-MSG')")
lines.insert(compiling_line_no + 3, INDENT + "sock.close()")
file.write_text("\n".join(lines) + "\n", "utf-8")
try:
yield
finally:
file.write_text(original, "utf-8")
@contextmanager
def message_passthrough_server() -> Iterator[None]:
sock_server = socket.create_server(("localhost", 7612))
def server_main() -> None:
sock, _ = sock_server.accept()
conn = msgsock.MessageSocket(sock)
while (message := conn.receive_message()) != "LAST-MSG":
ansi, text = message.split("|")
report_message(text, ansi=ansi)
sock.close()
thread = threading.Thread(target=server_main)
thread.start()
try:
yield
finally:
thread.join()
@click.command
@click.argument("src", type=click.Path(exists=True, path_type=Path))
@click.option("-r/-R", "--run/--no-run", help="Run (import) extension after compilation.")
@click.option("-i", "--interactive", is_flag=True, help="Run extension with the --interactive flag.")
@click.option("-C", "--inspect-c", "inspect_output", flag_value="c", help="Open generated C in an editor.")
@click.option("-IR", "--inspect-ir", "inspect_output", flag_value="ir", help="Open generated IR in an editor.")
@click.option("-v", "--verbose", is_flag=True)
@click.option("-d", "--passthrough", is_flag=True, help="Disable compilation output capturing (to allow PDB to work).")
def main(
src: Path,
run: bool,
interactive: bool,
inspect_output: str,
verbose: bool,
passthrough: bool
) -> None:
"""mypyc CLI shim."""
start_time = time.monotonic()
if (build_dir := (Path.cwd() / "build")).exists():
pass
else:
build_dir = THIS_DIR / "build"
try:
shutil.rmtree(build_dir)
except FileNotFoundError:
pass
with patch_mypyc(inject_progress_reporting=not passthrough):
if passthrough:
proc = subprocess.run(
[sys.executable, "-m", "mypyc", str(src)],
env={**os.environ, "MYPYC_OPT_LEVEL": "0"}
)
else:
with message_passthrough_server():
proc = subprocess.run(
[sys.executable, "-m", "mypyc", str(src)],
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf-8",
env={**os.environ, "MYPYC_OPT_LEVEL": "0"}
)
report_message()
if proc.returncode or verbose:
print(proc.stdout.strip())
if proc.returncode:
sys.exit(proc.returncode)
if run:
cmd = [sys.executable, "-c", f"import {src.stem}"]
if interactive:
cmd.insert(1, "-i")
proc = subprocess.run(cmd)
c_file = Path.cwd() / "build" / "__native.c"
lines = len(c_file.read_text().splitlines())
elapsed = time.monotonic() - start_time
click.secho(
f"[{src.name} in {elapsed:.2f}s: ({lines}) return code {proc.returncode}]",
dim=True
)
if inspect_output:
editor = os.getenv("EDITOR") or "/usr/bin/vim"
if inspect_output == "c":
target = Path.cwd() / "build" / "__native.c"
elif inspect_output == "ir":
target = Path.cwd() / "build" / "ops.txt"
subprocess.run([editor, target])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment