Skip to content

Instantly share code, notes, and snippets.

@tekknolagi
Last active March 16, 2024 11:14
Show Gist options
  • Star 25 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save tekknolagi/4bee494a6e4483e4d849559ba53d067b to your computer and use it in GitHub Desktop.
Save tekknolagi/4bee494a6e4483e4d849559ba53d067b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import multiprocessing
import random
import time
class Logger:
def __init__(self, num_lines, last_output_per_process, terminal_lock):
self.num_lines = num_lines
self.last_output_per_process = last_output_per_process
self.terminal_lock = terminal_lock
def fill_output(self):
to_fill = self.num_lines - len(self.last_output_per_process)
for _ in range(to_fill):
print()
def clean_up(self):
for _ in range(self.num_lines):
print("\x1b[1A\x1b[2K", end="") # move up cursor and delete whole line
def log(self, repo_name, *args):
with self.terminal_lock:
self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
self.clean_up()
sorted_lines = self.last_output_per_process.items()
for repo_name, last_line in sorted_lines:
print(f"{repo_name}: {last_line}")
self.fill_output()
def done(self, repo_name):
with self.terminal_lock:
del self.last_output_per_process[repo_name]
class MultiprocessingLogger(Logger):
def __init__(self, num_lines, manager):
super().__init__(num_lines, manager.dict(), manager.Lock())
class FakeLock:
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
pass
class SingleProcessLogger(Logger):
def __init__(self, num_lines):
super().__init__(num_lines, {}, FakeLock())
def randsleep():
time.sleep(random.randint(1, 2) / random.randint(1, 5))
def func(repo_name):
logger.log(repo_name, "Starting")
randsleep()
logger.log(repo_name, "Installing")
randsleep()
logger.log(repo_name, "Building")
randsleep()
logger.log(repo_name, "Instrumenting")
randsleep()
logger.log(repo_name, "Running tests")
randsleep()
logger.log(repo_name, f"Result in {repo_name}.json")
randsleep()
logger.done(repo_name)
def multi_process_demo():
ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
repos = [f"repo{letter}" for letter in ascii_uppercase]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
with multiprocessing.Manager() as manager:
global logger
logger = MultiprocessingLogger(num_lines, manager)
# Make space for our output
logger.fill_output()
with multiprocessing.Pool(num_procs) as pool:
pool.map(func, repos, chunksize=1)
logger.clean_up()
def single_process_demo():
repo = "repoA"
num_lines = 1
global logger
logger = SingleProcessLogger(num_lines)
logger.fill_output()
func(repo)
logger.clean_up()
if __name__ == "__main__":
multi_process_demo()
# single_process_demo()
@tekknolagi
Copy link
Author

That's really weird. Try putting that under if __name__ == "__main__":

Looks similar to what people ran into in nedbat/coveragepy#890 except that we don't use chdir. Apparently it could also be related to https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment