Last active
November 22, 2023 20:34
-
-
Save biasedbit/16a2dcee74fb38cb86d46dc181ac332d to your computer and use it in GitHub Desktop.
Worker runner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
import os | |
import signal | |
import subprocess | |
import sys | |
import threading | |
from enum import Enum | |
from typing import Any, Optional | |
# Script to run multiple workers in parallel as sub-processes. Blocks until all | |
# sub-processes have exited. Sends SIGTERM to all workers on SIGINT, and | |
# SIGKILL on second SIGINT. | |
class Color(Enum): | |
# ANSI color codes | |
RED = 31 | |
GREEN = 32 | |
YELLOW = 33 | |
BLUE = 34 | |
MAGENTA = 35 | |
CYAN = 36 | |
_COLOR_RESET = 0 | |
class SubprocRunner: | |
_proc: Optional[subprocess.Popen[str]] = None | |
def __init__(self, name: str, color: Color, cmd: list[str]) -> None: | |
self.name = name | |
self.color = color | |
self.cmd = cmd | |
def run(self) -> None: | |
if self._proc is not None: | |
return None | |
self._proc = subprocess.Popen( | |
self.cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
text=True, | |
preexec_fn=os.setpgrp, | |
) | |
print(f"🚀 Subprocess {self.name} running (pid={self._proc.pid}).") | |
while True: | |
assert self._proc.stdout is not None | |
output = self._proc.stdout.readline() | |
if output == "" and self._proc.poll() is not None: | |
break | |
if output: | |
self._log(output.strip()) | |
emoji = "🟢" if self._proc.returncode == 0 else "🔴" | |
print(f"{emoji} Subprocess {self.name} exited with code {self._proc.returncode}") | |
def stop(self, kill: bool = False) -> None: | |
if self._proc is None: | |
return None | |
if kill: | |
self._proc.kill() | |
else: | |
self._proc.terminate() | |
self._proc.wait() | |
def _log(self, msg: str) -> None: | |
print(f"\033[{self.color.value}m[{self.name}] \033[{_COLOR_RESET}m{msg}") | |
sigints: int = 0 | |
subprocs: list[SubprocRunner] = [] | |
def stop_subprocs(sig: int, _: Any) -> None: | |
global sigints | |
sigints += 1 | |
if sigints > 1: | |
print("\n🪓 Sending SIGKILL to workers...") | |
for process in subprocs: | |
process.stop(kill=True) | |
sys.exit(1) | |
else: | |
print("\n✋ Sending SIGTERM to workers (ctrl+c again to send SIGKILL)...") | |
for process in subprocs: | |
process.stop() | |
if __name__ == "__main__": | |
if len(sys.argv) < 2: | |
print("Usage: poetry run-workers <worker1> <worker2> ...") | |
sys.exit(1) | |
# Install SIGINT handler to send SIGTERM to child processes. | |
# This'll prevent KeyboardInterrupt from raising below. | |
signal.signal(signal.SIGINT, stop_subprocs) | |
workers: list[str] = sys.argv[1:] | |
threads: list[threading.Thread] = [] | |
color_cycle = itertools.cycle([color for color in Color]) | |
try: | |
for worker in workers: | |
runner = SubprocRunner( | |
worker, | |
next(color_cycle), | |
["poetry", "run", "python3", "-u", "-m", f"workers.{worker}"], | |
) | |
subprocs.append(runner) | |
thread = threading.Thread(target=runner.run) | |
threads.append(thread) | |
thread.start() | |
for thread in threads: | |
thread.join() | |
except KeyboardInterrupt: | |
pass | |
# -- pyproject.toml task | |
# [tool.poe.tasks.run-workers] | |
# envfile = ".env" | |
# args = [ | |
# { name = "worker", positional = true, multiple = true, required = true }, | |
# ] | |
# cmd = "poetry run python ./scripts/run_workers.py $worker" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment