Skip to content

Instantly share code, notes, and snippets.

@parsa
Created November 14, 2022 15:21
Show Gist options
  • Save parsa/31511ccf7c238916ab63ac2558737caa to your computer and use it in GitHub Desktop.
Save parsa/31511ccf7c238916ab63ac2558737caa to your computer and use it in GitHub Desktop.
Python - Sharing a queue between multiprocessing.Pool tasks
import multiprocessing
import subprocess
import tqdm
def work(args):
def workimpl(task_id, core_ids_queue):
core_id = core_ids_queue.get()
tqdm.tqdm.write(
f"task {task_id} is running on core {core_id} and the queue size is {core_ids_queue.qsize()}"
)
# Sleep for .1 seconds
subprocess.run(["taskset", "-c", str(core_id), "sleep", ".5"])
# Put the core back in the queue
core_ids_queue.put(core_id)
tqdm.tqdm.write(
f"task {task_id} is done with core {core_id} and the queue size is {core_ids_queue.qsize()}"
)
return task_id
return workimpl(*args)
TASKS_COUNT = 105
mp_manager = multiprocessing.Manager()
core_ids_queue = mp_manager.Queue()
for i in range(2):
core_ids_queue.put(i)
with multiprocessing.Pool(processes=3) as pool:
tasks_args = [(i, core_ids_queue) for i in range(TASKS_COUNT)]
mapped_values = list(
tqdm.tqdm(
pool.imap_unordered(work, tasks_args),
total=TASKS_COUNT,
)
)
tqdm.tqdm.write(f"qsize: {core_ids_queue.qsize()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment