Skip to content

Instantly share code, notes, and snippets.

@tekknolagi
Last active March 16, 2024 11:14
Show Gist options
  • Star 25 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save tekknolagi/4bee494a6e4483e4d849559ba53d067b to your computer and use it in GitHub Desktop.
Save tekknolagi/4bee494a6e4483e4d849559ba53d067b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import multiprocessing
import random
import time
class Logger:
def __init__(self, num_lines, last_output_per_process, terminal_lock):
self.num_lines = num_lines
self.last_output_per_process = last_output_per_process
self.terminal_lock = terminal_lock
def fill_output(self):
to_fill = self.num_lines - len(self.last_output_per_process)
for _ in range(to_fill):
print()
def clean_up(self):
for _ in range(self.num_lines):
print("\x1b[1A\x1b[2K", end="") # move up cursor and delete whole line
def log(self, repo_name, *args):
with self.terminal_lock:
self.last_output_per_process[repo_name] = " ".join(str(arg) for arg in args)
self.clean_up()
sorted_lines = self.last_output_per_process.items()
for repo_name, last_line in sorted_lines:
print(f"{repo_name}: {last_line}")
self.fill_output()
def done(self, repo_name):
with self.terminal_lock:
del self.last_output_per_process[repo_name]
class MultiprocessingLogger(Logger):
def __init__(self, num_lines, manager):
super().__init__(num_lines, manager.dict(), manager.Lock())
class FakeLock:
def __enter__(self):
pass
def __exit__(self, exc_type, exc_value, traceback):
pass
class SingleProcessLogger(Logger):
def __init__(self, num_lines):
super().__init__(num_lines, {}, FakeLock())
def randsleep():
time.sleep(random.randint(1, 2) / random.randint(1, 5))
def func(repo_name):
logger.log(repo_name, "Starting")
randsleep()
logger.log(repo_name, "Installing")
randsleep()
logger.log(repo_name, "Building")
randsleep()
logger.log(repo_name, "Instrumenting")
randsleep()
logger.log(repo_name, "Running tests")
randsleep()
logger.log(repo_name, f"Result in {repo_name}.json")
randsleep()
logger.done(repo_name)
def multi_process_demo():
ascii_uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
repos = [f"repo{letter}" for letter in ascii_uppercase]
num_procs = multiprocessing.cpu_count()
num_lines = min(len(repos), num_procs)
with multiprocessing.Manager() as manager:
global logger
logger = MultiprocessingLogger(num_lines, manager)
# Make space for our output
logger.fill_output()
with multiprocessing.Pool(num_procs) as pool:
pool.map(func, repos, chunksize=1)
logger.clean_up()
def single_process_demo():
repo = "repoA"
num_lines = 1
global logger
logger = SingleProcessLogger(num_lines)
logger.fill_output()
func(repo)
logger.clean_up()
if __name__ == "__main__":
multi_process_demo()
# single_process_demo()
@FranklinChen
Copy link

This code doesn't run for me:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 131, in _main
    prepare(preparation_data)
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 246, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 297, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen runpy>", line 286, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/chen/Downloads/lines.py", line 100, in <module>
    multi_process_demo()
  File "/Users/chen/Downloads/lines.py", line 80, in multi_process_demo
    with multiprocessing.Manager() as manager:
         ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/context.py", line 57, in Manager
    m.start()
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/managers.py", line 562, in start
    self._process.start()
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/context.py", line 289, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 164, in get_preparation_data
    _check_not_importing_main()
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/spawn.py", line 140, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

        To fix this issue, refer to the "Safe importing of main module"
        section in https://docs.python.org/3/library/multiprocessing.html
        
Traceback (most recent call last):
  File "/Users/chen/Downloads/lines.py", line 100, in <module>
    multi_process_demo()
  File "/Users/chen/Downloads/lines.py", line 80, in multi_process_demo
    with multiprocessing.Manager() as manager:
         ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/context.py", line 57, in Manager
    m.start()
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/managers.py", line 566, in start
    self._address = reader.recv()
                    ^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
          ^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 430, in _recv_bytes
    buf = self._recv(4)
          ^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.12/3.12.2_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/multiprocessing/connection.py", line 399, in _recv
    raise EOFError
EOFError

@tekknolagi
Copy link
Author

That's really weird. Try putting that under if __name__ == "__main__":

Looks similar to what people ran into in nedbat/coveragepy#890 except that we don't use chdir. Apparently it could also be related to https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment