Last active
September 10, 2024 20:03
-
-
Save tekknolagi/4bee494a6e4483e4d849559ba53d067b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import multiprocessing | |
import random | |
import time | |
class Logger: | |
def __init__(self, num_lines, last_output_per_process, terminal_lock): | |
self.num_lines = num_lines | |
self.last_output_per_process = last_output_per_process | |
self.terminal_lock = terminal_lock | |
def fill_output(self): | |
to_fill = self.num_lines - len(self.last_output_per_process) | |
for _ in range(to_fill): | |
print() | |
def clean_up(self): | |
for _ in range(self.num_lines): | |
print("\x1b[1A\x1b[2K", end="") # move up cursor and delete whole line | |
def log(self, repo_name, *args): | |
with self.terminal_lock: | |
self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args) | |
self.clean_up() | |
sorted_lines = self.last_output_per_process.items() | |
for repo_name, last_line in sorted_lines: | |
print(f"{repo_name}: {last_line}") | |
self.fill_output() | |
def done(self, repo_name): | |
with self.terminal_lock: | |
del self.last_output_per_process[repo_name] | |
class MultiprocessingLogger(Logger): | |
def __init__(self, num_lines, manager): | |
super().__init__(num_lines, manager.dict(), manager.Lock()) | |
class FakeLock: | |
def __enter__(self): | |
pass | |
def __exit__(self, exc_type, exc_value, traceback): | |
pass | |
class SingleProcessLogger(Logger): | |
def __init__(self, num_lines): | |
super().__init__(num_lines, {}, FakeLock()) | |
def randsleep(): | |
time.sleep(random.randint(1, 2) / random.randint(1, 5)) | |
def func(repo_name): | |
logger.log(repo_name, "Starting") | |
randsleep() | |
logger.log(repo_name, "Installing") | |
randsleep() | |
logger.log(repo_name, "Building") | |
randsleep() | |
logger.log(repo_name, "Instrumenting") | |
randsleep() | |
logger.log(repo_name, "Running tests") | |
randsleep() | |
logger.log(repo_name, f"Result in {repo_name}.json") | |
randsleep() | |
logger.done(repo_name) | |
def multi_process_demo(): | |
ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" | |
repos = [f"repo{letter}" for letter in ascii_uppercase] | |
num_procs = multiprocessing.cpu_count() | |
num_lines = min(len(repos), num_procs) | |
with multiprocessing.Manager() as manager: | |
global logger | |
logger = MultiprocessingLogger(num_lines, manager) | |
# Make space for our output | |
logger.fill_output() | |
with multiprocessing.Pool(num_procs) as pool: | |
pool.map(func, repos, chunksize=1) | |
logger.clean_up() | |
def single_process_demo(): | |
repo = "repoA" | |
num_lines = 1 | |
global logger | |
logger = SingleProcessLogger(num_lines) | |
logger.fill_output() | |
func(repo) | |
logger.clean_up() | |
if __name__ == "__main__": | |
multi_process_demo() | |
# single_process_demo() |
That's really weird. Try putting that under if __name__ == "__main__":
Looks similar to what people ran into in nedbat/coveragepy#890 except that we don't use chdir. Apparently it could also be related to https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code doesn't run for me: