Created
November 14, 2022 15:21
-
-
Save parsa/31511ccf7c238916ab63ac2558737caa to your computer and use it in GitHub Desktop.
Python - Sharing a queue between multiprocessing.Pool tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing | |
import subprocess | |
import tqdm | |
def work(args): | |
def workimpl(task_id, core_ids_queue): | |
core_id = core_ids_queue.get() | |
tqdm.tqdm.write( | |
f"task {task_id} is running on core {core_id} and the queue size is {core_ids_queue.qsize()}" | |
) | |
# Sleep for .1 seconds | |
subprocess.run(["taskset", "-c", str(core_id), "sleep", ".5"]) | |
# Put the core back in the queue | |
core_ids_queue.put(core_id) | |
tqdm.tqdm.write( | |
f"task {task_id} is done with core {core_id} and the queue size is {core_ids_queue.qsize()}" | |
) | |
return task_id | |
return workimpl(*args) | |
TASKS_COUNT = 105 | |
mp_manager = multiprocessing.Manager() | |
core_ids_queue = mp_manager.Queue() | |
for i in range(2): | |
core_ids_queue.put(i) | |
with multiprocessing.Pool(processes=3) as pool: | |
tasks_args = [(i, core_ids_queue) for i in range(TASKS_COUNT)] | |
mapped_values = list( | |
tqdm.tqdm( | |
pool.imap_unordered(work, tasks_args), | |
total=TASKS_COUNT, | |
) | |
) | |
tqdm.tqdm.write(f"qsize: {core_ids_queue.qsize()}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment