I wanted to observe the overall and per work-package progress on a python process pool which executes a large set of long-ish running tasks. It turns out that there are some issues in our beloved progressbar library - tqdm - that cause multiprocessed use-cases to mess up your terminal pretty badly.
There seems to be ongoing discussion about the problem here: tqdm/tqdm#1000 In the meantime, I created the workaround below.
I want to track the overall progress (as in percentage of work-packages completed) and the progress of all active processes. Once a process is done, I want its progress bar to disappear, because I have a lot of work packages.
The key idea is to use a multiprocessing.Array
to keep track of what progress bar positions are currently in use by any of the processes.
At first I made a more naive attempt: Enumerating my work packages and then using workpackage_id % num_of_processes
for the
position, but since my work packages don't all take the exact same amount of time, I would end up with multiple active
processes using the same slot.
from multiprocessing.pool import Pool
from multiprocessing import Array
from tqdm import tqdm
from time import sleep
from random import randrange
from typing import *
# You could replace that with multiprocessing.cpu_count()
PROCESSES = 20
# This array is shared among all processes and allows them to keep track of which tqdm "positions" are
# occupied / free.
pgb_slots = Array("B", [0] * PROCESSES)
def get_pgb_pos() -> int:
# Acquire lock and get a progress bar slot
with pgb_slots.get_lock():
for i in range(PROCESSES):
if pgb_slots[i] == 0:
pgb_slots[i] = 1
return i
def release_pgb_pos(slot: int):
with pgb_slots.get_lock():
pgb_slots[slot] = 0
def do_work(package_number: int) -> Tuple[int, int]:
pgb_pos = get_pgb_pos()
try:
for _ in tqdm(
range(10),
total=10,
desc=f"Work package: {package_number}",
# +1 so we do not overwrite the overall progress
position=pgb_pos + 1,
leave=False,
):
sleep(randrange(1, 3) / 10)
finally:
release_pgb_pos(pgb_pos)
result = package_number
return package_number, result
if __name__ == "__main__":
work_packages = range(100)
results = [None] * len(work_packages)
with Pool(PROCESSES, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)) as p:
for package_number, result in tqdm(
# I use imap_unordered, because it will yield any result immediately once its computed, instead of
# blocking until the results can be yielded in order, giving a more accurate progress
p.imap_unordered(do_work, work_packages),
total=len(work_packages),
position=0,
desc="Work packages completed",
leave=True,
):
results[package_number] = result