Skip to content

Instantly share code, notes, and snippets.

@biasedbit
Last active November 22, 2023 20:34
Show Gist options
  • Save biasedbit/16a2dcee74fb38cb86d46dc181ac332d to your computer and use it in GitHub Desktop.
Save biasedbit/16a2dcee74fb38cb86d46dc181ac332d to your computer and use it in GitHub Desktop.
Worker runner
import itertools
import os
import signal
import subprocess
import sys
import threading
from enum import Enum
from typing import Any, Optional
# Script to run multiple workers in parallel as sub-processes. Blocks until all
# sub-processes have exited. Sends SIGTERM to all workers on SIGINT, and
# SIGKILL on second SIGINT.
class Color(Enum):
# ANSI color codes
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
_COLOR_RESET = 0
class SubprocRunner:
_proc: Optional[subprocess.Popen[str]] = None
def __init__(self, name: str, color: Color, cmd: list[str]) -> None:
self.name = name
self.color = color
self.cmd = cmd
def run(self) -> None:
if self._proc is not None:
return None
self._proc = subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
preexec_fn=os.setpgrp,
)
print(f"🚀 Subprocess {self.name} running (pid={self._proc.pid}).")
while True:
assert self._proc.stdout is not None
output = self._proc.stdout.readline()
if output == "" and self._proc.poll() is not None:
break
if output:
self._log(output.strip())
emoji = "🟢" if self._proc.returncode == 0 else "🔴"
print(f"{emoji} Subprocess {self.name} exited with code {self._proc.returncode}")
def stop(self, kill: bool = False) -> None:
if self._proc is None:
return None
if kill:
self._proc.kill()
else:
self._proc.terminate()
self._proc.wait()
def _log(self, msg: str) -> None:
print(f"\033[{self.color.value}m[{self.name}] \033[{_COLOR_RESET}m{msg}")
sigints: int = 0
subprocs: list[SubprocRunner] = []
def stop_subprocs(sig: int, _: Any) -> None:
global sigints
sigints += 1
if sigints > 1:
print("\n🪓 Sending SIGKILL to workers...")
for process in subprocs:
process.stop(kill=True)
sys.exit(1)
else:
print("\n✋ Sending SIGTERM to workers (ctrl+c again to send SIGKILL)...")
for process in subprocs:
process.stop()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: poetry run-workers <worker1> <worker2> ...")
sys.exit(1)
# Install SIGINT handler to send SIGTERM to child processes.
# This'll prevent KeyboardInterrupt from raising below.
signal.signal(signal.SIGINT, stop_subprocs)
workers: list[str] = sys.argv[1:]
threads: list[threading.Thread] = []
color_cycle = itertools.cycle([color for color in Color])
try:
for worker in workers:
runner = SubprocRunner(
worker,
next(color_cycle),
["poetry", "run", "python3", "-u", "-m", f"workers.{worker}"],
)
subprocs.append(runner)
thread = threading.Thread(target=runner.run)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
except KeyboardInterrupt:
pass
# -- pyproject.toml task
# [tool.poe.tasks.run-workers]
# envfile = ".env"
# args = [
# { name = "worker", positional = true, multiple = true, required = true },
# ]
# cmd = "poetry run python ./scripts/run_workers.py $worker"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment