Skip to content

Instantly share code, notes, and snippets.

@NiklasBeierl
Last active March 29, 2023 11:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NiklasBeierl/13096bfdd8b2084da8c1163dd06f91d3 to your computer and use it in GitHub Desktop.
Save NiklasBeierl/13096bfdd8b2084da8c1163dd06f91d3 to your computer and use it in GitHub Desktop.
Usefull progress observation on a multiprocessing pool with tqdm.

About

I wanted to observe the overall and per work-package progress on a python process pool which executes a large set of long-ish running tasks. It turns out that there are some issues in our beloved progressbar library - tqdm - that cause multiprocessed use-cases to mess up your terminal pretty badly.

There seems to be ongoing discussion about the problem here: tqdm/tqdm#1000 In the meantime, I created the workaround below.

My Usecase

I want to track the overall progress (as in percentage of work-packages completed) and the progress of all active processes. Once a process is done, I want its progress bar to disappear, because I have a lot of work packages.

The workaround

The key idea is to use a multiprocessing.Array to keep track of what progress bar positions are currently in use by any of the processes. At first I made a more naive attempt: Enumerating my work packages and then using workpackage_id % num_of_processes for the position, but since my work packages don't all take the exact same amount of time, I would end up with multiple active processes using the same slot.

Running:

Screenshot from 2023-03-29 13-03-16

Finished:

Screenshot from 2023-03-29 13-03-20

The Code

from multiprocessing.pool import Pool
from multiprocessing import Array
from tqdm import tqdm
from time import sleep
from random import randrange
from typing import *

# You could replace that with multiprocessing.cpu_count()
PROCESSES = 20

# This array is shared among all processes and allows them to keep track of which tqdm "positions" are
# occupied / free.
pgb_slots = Array("B", [0] * PROCESSES)


def get_pgb_pos() -> int:
    # Acquire lock and get a progress bar slot
    with pgb_slots.get_lock():
        for i in range(PROCESSES):
            if pgb_slots[i] == 0:
                pgb_slots[i] = 1
                return i


def release_pgb_pos(slot: int):
    with pgb_slots.get_lock():
        pgb_slots[slot] = 0


def do_work(package_number: int) -> Tuple[int, int]:
    pgb_pos = get_pgb_pos()
    try:
        for _ in tqdm(
            range(10),
            total=10,
            desc=f"Work package: {package_number}",
            # +1 so we do not overwrite the overall progress
            position=pgb_pos + 1,
            leave=False,
        ):
            sleep(randrange(1, 3) / 10)
    finally:
        release_pgb_pos(pgb_pos)

    result = package_number
    return package_number, result


if __name__ == "__main__":
    work_packages = range(100)
    results = [None] * len(work_packages)
    with Pool(PROCESSES, initializer=tqdm.set_lock, initargs=(tqdm.get_lock(),)) as p:
        for package_number, result in tqdm(
            # I use imap_unordered, because it will yield any result immediately once its computed, instead of
            # blocking  until the results can be yielded in order, giving a more accurate progress
            p.imap_unordered(do_work, work_packages),
            total=len(work_packages),
            position=0,
            desc="Work packages completed",
            leave=True,
        ):
            results[package_number] = result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment